Исключения времени выполнения приложения не отправляются в errorChannel или ServiceActivator, которые не могут прослушивать errorChannel

После прослушивания темы kafka с помощью @StreamListener, при RuntimeException глобальный канал erroChannel или канал errorChannel для конкретной темы (topic.group.errors) не получает сообщения об ошибке. @ServiceActivator ничего не получает.

Зависимости POM: Greenwich.RELEASE

            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

application.properties

spring.cloud.stream.bindings.input.destination=input
spring.cloud.stream.bindings.input.group=myGroup
spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true

spring.cloud.stream.kafka.streams.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName=input_deadletter
spring.cloud.stream.kafka.streams.bindings.input.consumer.autoCommitOnError=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.content-Type=application/*+avro
spring.cloud.stream.bindings.output.producer.useNativeEncoding=true
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.schemaRegistryClient.endpoint.schema.avro.schema-locations=classpath:avro/*.avsc

spring.cloud.stream.kafka.streams.binder.brokers=localhost

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=http://localhost:8082

spring.cloud.stream.kafka.streams.binder.application-id=myGroup
spring.cloud.stream.kafka.streams.binder.serdeError=sendtodlq

Я вижу в журналах, что активатор услуги зарегистрирован и подписан на каналы ошибок. Все потоки останавливаются и переходят в режим выключения после возникновения исключения времени выполнения.

Registering beans for JMX exposure on startup
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel input.myGroup.errors
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name="input-myGroup.errors"': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name="input.myGroup.errors"] org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel errorChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=errorChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=errorChannel]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel nullChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=nullChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=nullChannel]
 org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler errorLogger
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal]
 org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.error.serviceActivator
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.errorGlobal.serviceActivator
 org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint]
org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor - No @KafkaListener annotations found on bean type: class org.springf
    @SendTo(MyStreams.OUTPUT)
    public KStream<Key, MyEntity> process(KStream<Key, Envelope> myStreamObject) {

        return myStreamObject.mapValues(this::transform);
    }

@ServiceActivator(inputChannel = "input.myGroup.errors") //channel name 'input.myGroup.errors'
    public void error(Message<?> message) {
        System.out.println("Handling ERROR:  " + message);
 }

@ServiceActivator(inputChannel = "errorChannel")
    public void errorGlobal(Message<?> message) {
        System.out.println("Handling ERROR: GLOBAL " + message);
    }



Ответы (1)


Связующее устройство kafka streams не основано на MessageChannels, поэтому нет Message<?> для отправки в канал ошибок.

Стандартный связыватель kafka - это MessageChannelBinder и поддерживает канал ошибок.

В Kafka Streams вам нужно реализовать собственную обработку ошибок.

person Gary Russell    schedule 07.06.2019
comment
Я ждал твоего вердикта по этому поводу, Гэри, спасибо. Может быть, было бы здорово, если бы документация по облачным потокам Spring явно разъясняла это. - person NiBa; 08.06.2019
comment
Я включил DLQ для ошибок serde, и он успешно пересылает неудачные сообщения в DLQ. Есть ли рекомендуемый способ отправки ошибок приложения в тот же DLQ? - person NiBa; 08.06.2019
comment
Не задавайте новых вопросов в комментариях; это не помогает людям находить ответы. Задайте новый вопрос. - person Gary Russell; 10.06.2019