Я использую эту настройку docker-compose для локальной настройки Kafka: https://github.com/wurstmeister/kafka-docker/
docker-compose up
работает нормально, создание тем через шелл работает нормально.
Теперь пытаюсь подключиться к Кафке через spring-kafka:2.1.0.RELEASE
При запуске приложения Spring оно печатает правильную версию Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
Я пытаюсь отправить сообщение, как это
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
Отправка на стороне клиента завершается с ошибкой
UnknownServerException: The server experienced an unexpected error when processing the request
В консоли сервера я получаю сообщение Magic v1 не поддерживает заголовки записей.
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
Поиск в Google предполагает конфликт версий, но версия, похоже, подходит (org.apache.kafka:kafka-clients:1.0.0
находится в пути к классам).
Любые подсказки? Спасибо!
Изменить: я сузил источник проблемы. Отправка простых строк работает, но отправка Json через JsonSerializer приводит к данной проблеме. Вот содержимое моей конфигурации производителя:
@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
}
@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
DefaultKafkaProducerFactory(producerConfigs())
@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
KafkaTemplate(producerFactory())
RecordHeaders
отправляется (клиентом), когда шаблон не отправляет никаких заголовков. - person Gary Russell   schedule 23.12.2017