Spring Kafka Producer не отправляет в Kafka 1.0.0 (Magic v1 не поддерживает заголовки записей)

Я использую эту настройку docker-compose для локальной настройки Kafka: https://github.com/wurstmeister/kafka-docker/

docker-compose up работает нормально, создание тем через шелл работает нормально.

Теперь пытаюсь подключиться к Кафке через spring-kafka:2.1.0.RELEASE

При запуске приложения Spring оно печатает правильную версию Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

Я пытаюсь отправить сообщение, как это

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Отправка на стороне клиента завершается с ошибкой

UnknownServerException: The server experienced an unexpected error when processing the request

В консоли сервера я получаю сообщение Magic v1 не поддерживает заголовки записей.

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Поиск в Google предполагает конфликт версий, но версия, похоже, подходит (org.apache.kafka:kafka-clients:1.0.0 находится в пути к классам).

Любые подсказки? Спасибо!

Изменить: я сузил источник проблемы. Отправка простых строк работает, но отправка Json через JsonSerializer приводит к данной проблеме. Вот содержимое моей конфигурации производителя:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

person DerM    schedule 23.12.2017    source источник
comment
Это не имеет смысла; (получение этого сообщения на стороне сервера). Если бы версия клиента была старше, она не отправляла бы никаких заголовков, поэтому все должно быть хорошо (я тестировал клиент 1.0.0 с брокером 0.10, и он работает, пока вы не пытаетесь отправить заголовки). С клиентом 1.0.0 пустой RecordHeaders отправляется (клиентом), когда шаблон не отправляет никаких заголовков.   -  person Gary Russell    schedule 23.12.2017
comment
Похоже, что в именах образов не указана версия, поэтому вы можете использовать более старый кешированный образ докера и более новый клиент. Клиент версии 1.0, отправляющий заголовки брокеру версии 0.10, получит эту ошибку. Попробуйте проверить версию образа Docker и Docker вытащить новейший образ брокера 1.0.   -  person Hans Jespersen    schedule 23.12.2017
comment
После обновления версии Kafka до последней версии я не получал Magic v1, не поддерживающего исключение заголовков записей, и код работал как шарм.   -  person Ankit bansal    schedule 17.07.2019


Ответы (4)


У меня была аналогичная проблема. Kafka добавляет заголовки по умолчанию, если мы используем JsonSerializer или JsonSerde для значений. Чтобы предотвратить эту проблему, нам нужно отключить добавление информационных заголовков.

если у вас все в порядке с сериализацией json по умолчанию, используйте следующее (ключевой момент здесь — ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

но если вам нужен пользовательский JsonSerializer с определенным ObjectMapper (например, с PropertyNamingStrategy.SNAKE_CASE), вы должны отключить явное добавление информационных заголовков в JsonSerializer, так как spring kafka игнорирует свойство DefaultKafkaProducerFactory ADD_TYPE_INFO_HEADERS (как по мне, это плохой дизайн spring kafka)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

или если мы используем JsonSerde, то:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
person Vasyl Sarzhynskyi    schedule 28.05.2018
comment
Добавление этой строки решило мою проблему props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); - person mojtab23; 02.07.2018
comment
Что делать, если мне нужен заголовок типа? Как я могу преодолеть проблему? - person Dikla; 24.10.2019

Решено. Проблема не в брокере, не в каком-то кеше докеров и не в приложении Spring.

Проблема заключалась в потребителе консоли, который я использовал параллельно для отладки. Это был «старый» потребитель, начинавшийся с kafka-console-consumer.sh --topic=topic --zookeeper=...

На самом деле он выводит предупреждение при запуске: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

Следует использовать «нового» потребителя с параметром --bootstrap-server (особенно при использовании Kafka 1.0 с JsonSerializer). Примечание. Использование здесь старого потребителя действительно может повлиять на производителя.

person DerM    schedule 23.12.2017
comment
Я не запускаю никаких потребителей параллельно, но все же эта проблема возникает. Я также попытался добавить props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); но все равно выдает ту же ошибку, что magicv1 не поддерживает добавление заголовка записи. Не могли бы вы помочь здесь - person Gopal; 22.10.2020

Я только что провел тест на этом образе докера без проблем...

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application {

    public static void main(String[] args) {
        SpringApplication.run(So47953901Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> template.send("foo", "bar", "baz");
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

ИЗМЕНИТЬ

У меня до сих пор работает...

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz
person Gary Russell    schedule 23.12.2017
comment
Спасибо за проверку! Я сузил источник проблемы. Если я отправлю простые строки, это сработает! Но если я использую JsonSerializer, возникает проблема. Смотрите обновление. - person DerM; 23.12.2017
comment
У меня все еще работает - см. Мое редактирование. JsonSerializer добавляет заголовки по умолчанию, поэтому он определенно выглядит так, как будто брокер в вашем образе докера ‹ 0,11. Вы можете подтвердить это, установив для свойства производителя JsonSerializer.ADD_TYPE_INFO_HEADERS значение false. Это свойство не позволяет добавлять информацию о типе в заголовки. - person Gary Russell; 23.12.2017
comment
Спасибо! Вы правы, это работает с данным свойством. Я просто не понимаю. Когда я запускаю docker-compose, он определенно печатает kafka_1 | [2017-12-23 20:51:53,289] INFO Kafka version : 1.0.0. Я также пытался воссоздать, удалить все изображения и т. д., например: docker-compose down && docker-compose build --no-cache && docker-compose up - person DerM; 23.12.2017
comment
Попробуйте docker logs <containerId> > server.log в моем, я вижу это: [2017-12-23 18:07:52,905] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser). - person Gary Russell; 24.12.2017
comment
Да.. То же самое здесь. - person DerM; 24.12.2017
comment
Если я /bin/bash в образ, я также вижу все банки kafka_2.12-1.0.0. - person DerM; 24.12.2017
comment
Странно - что еще более странно, если я говорю с сервером 0.10.2.0 из S-K 2.1.0 и JsonSerializer, я получаю исключение на клиенте, как и ожидалось Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers. Как я сказал в своем первоначальном комментарии к вашему вопросу, старые брокеры ничего не знают о заголовках, поэтому я не понимаю, как вы можете получить эту ошибку на сервере, если клиент не старый. Вы используете свой клиент на AWS? Я уже слышал о некоторых странностях с кодом на AWS, работающем под старым kafka-clients, даже если приложение упаковано с правильным. - person Gary Russell; 24.12.2017
comment
Найдите 2017-12-23 16:22:16.500 INFO 55322 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 в журнале клиента. - person Gary Russell; 24.12.2017
comment
Итак.. еще одно обновление. Я думаю, что проблема не в брокере и не в приложении Spring. Я использовал потребителя консоли параллельно с приложением Spring для отладки (на основе этот учебник ). Я почти уверен, что проблема возникает при использовании старого потребителя (kafka-console-consumer.sh --topic=topic --zookeeper=$ZK) с параметром zookeper вместо bootstrap-server. Что мне кажется интересным, так это то, что этот потребитель приводит к исключению UnknownServerException в производителе (Spring). - person DerM; 24.12.2017
comment
Попробуйте использовать параметр --bootstrap-server вместо --zookeeper; в этом случае console-consumer будет использовать нового потребителя (но вы не сможете видеть заголовки с потребителем консоли, несмотря ни на что). Но я согласен, что странно, что потребитель может влиять на производителя. Может быть, есть какая-то логика, которая говорит, что у нас есть старый потребитель, прикрепленный к этой теме, поэтому вы не можете отправлять заголовки. - person Gary Russell; 24.12.2017
comment
Да, это работает. Большое спасибо за уделенное время, с праздником! - person DerM; 24.12.2017

вы используете версию kafka ‹=0.10.x.x, как только вы используете это, вы должны установить для JsonSerializer.ADD_TYPE_INFO_HEADERS значение false, как показано ниже.

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

для вашего производителя заводских свойств.

Если вы используете версию kafka> 0.10.x.x, она должна работать нормально.

person Kshitiz Agarwal    schedule 18.03.2020