Проблема с Spring Cloud Stream с использованием клиента реестра схемы по умолчанию вместо клиента реестра схемы Avro

Мы используем Kafka с Spring Cloud Stream, и нам нужно подключиться к Confluent Schema Registry в нашем компоненте Spring Boot, см. https://github.com/donalthurley/KafkaConsumeScsAndConfluent.

Мы добавили следующую конфигурацию для создания необходимого bean-компонента ConfluentSchemaRegistryClient, см. https://github.com/donalthurley/KafkaConsumeScsAndConfluent/blob/master/src/main/java/com/example/kafka/KafkaConfig.java, который должен переопределить реестр схем по умолчанию из Весенний поток облаков.

Однако после некоторых развертываний мы периодически наблюдали следующий сбой.

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel

Основная причина показывает эту трассировку стека

Caused by: java.lang.NullPointerException
    at     org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:71)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:238)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:179)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
    at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:322)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:611)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)

Тот факт, что DefaultSchemaRegistryClient вызывается AvroSchemaRegistryClientMessageConverter, указывает нам на наличие проблемы с подключением нашего bean-компонента ConfluentSchemaRegistryClient.

Требуется ли что-то еще в нашей конфигурации для обеспечения правильного подключения bean-компонента ConfluentSchemaRegistryClient?


person Donal Hurley    schedule 05.11.2018    source источник
comment
Было бы полезно, если вы разместите проект (на Github или где-то еще) с минимальным набором настроек, воспроизводящих проблему. Трудно предположить, что может быть не так, хотя NPE определенно пахнет ошибкой с нашей стороны.   -  person Oleg Zhurakousky    schedule 05.11.2018
comment
Вот рабочий пример, который показывает, как приложение Spring Cloud Stream подключается к реестру схем Confluent. Можете ли вы сравнить заметки и увидеть, чего не хватает в вашей конфигурации? github. ru / spring-cloud / spring-cloud-stream-samples / tree / master /   -  person sobychacko    schedule 05.11.2018


Ответы (2)


У меня это сработало. Вот что я сделал:

  • использовал класс конфигурации точно так же, как и вы
  • использовали аннотацию @EnableSchemaRegistryClient в проекте
  • добавлен сериализатор avro в путь к классам: io.confluent:kafka-avro-serializer
  • установите следующие свойства:

    spring.cloud.stream.kafka.bindings.channel.consumer.configuration.schema.registry.url = адрес вашего реестра spring.cloud.stream.kafka.bindings.channel.consumer.configuration.specific.avro.reader = true

где канал соответствует названию канала в вашем приложении.

Я думаю, что последнее свойство очень важно, чтобы сообщить Spring использовать сериализатор Avro вместо сериализатора по умолчанию.

Я использую Spring Cloud Stream Elmhurst.RELEASE, поэтому имена свойств могут немного отличаться, если вы используете другую версию.

person HL'REB    schedule 05.11.2018

Я переместил аннотацию @EnableSchemaRegistryClient в класс приложения проекта, см. donalthurley / KafkaConsumeScsAndConfluent / commit / b4cf5427d7ab0a4fed619fe54b042890f5ccb594 и повторно развернул, и это устранило проблему, которая возникла у меня при развертывании в нашей среде.

Я аннотировал свои классы производителя и потребителя с помощью аннотации @EnableSchemaRegistryClient.

Во всех моих локальных тестах это работало против моего локального реестра конфлюентных схем докеров. Однако при развертывании в наших средах он работал большую часть времени, но иногда выходил из строя после некоторых развертываний.

Мне не удалось воспроизвести это локально.

Я также заметил при локальном тестировании, что получаю ту же самую трассировку стека исключений с нулевым указателем, если удаляю конфигурацию для реестра Confluent Schema Registry.

Итак, я думаю, что проблема заключается в том, что bean-компонент AvroSchemaRegistryClientMessageConverter не был связан с bean-компонентом реестра объединенной схемы, когда аннотация @EnableSchemaRegistryClient отсутствовала в классе приложения проекта.

Я не понимаю, почему именно это было бы необходимо, но думаю, что это могло решить проблему.

person Donal Hurley    schedule 06.11.2018