Spring Cloud Stream: @StreamListener дважды обрабатывает сообщения

Я использую Spring Cloud Stream (Edgware.SR5) с Spring Boot (1.5.10.RELEASE). Мой @StreamListener дважды обрабатывает каждое полученное сообщение.

Идея примера - опубликовать сообщение в очереди и обработать его.

Услуга:

@EnableBinding(ExampleBindings.class)
@Service
public class ExampleService {

    @Publisher(channel = ExampleBindings.OUTPUT)
    public String queue(String message){
        return message;
    }

    @StreamListener(ExampleBindings.INPUT)
    public void dequeue(String message){
        System.out.println("New message: " + message);
    }
}

Привязки:

public interface ExampleBindings {

    String INPUT = "input1";
    String OUTPUT = "output1";

    @Input(ExampleBindings.INPUT)
    SubscribableChannel input();

    @Output(ExampleBindings.OUTPUT)
    MessageChannel output();
}

application.properties:

spring.cloud.stream.default.group=group1
spring.cloud.stream.default.binder=binder1

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

spring.cloud.stream.binders.binder1.type=rabbit
spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost

Конфигурация (для внедрения прокси-сервиса в тест):

@Configuration
public class ExampleConfig {

    @Bean
    public PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor(){
         PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor =
            new PublisherAnnotationBeanPostProcessor();
        publisherAnnotationBeanPostProcessor.setProxyTargetClass(true);
        return publisherAnnotationBeanPostProcessor;
    }
}

Тестовое задание:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {

    @Autowired
    private ExampleService exampleService;

    @Test
    public void testQueue() throws InterruptedException {
        exampleService.queue("Hello!");
        Thread.sleep(1000);//Wait for message processing
        System.out.println("Ready!");
    }
}

У меня следующий результат:

17:19:10.230 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b received message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.231 [dest1.group1-1] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
17:19:10.231 [dest1.group1-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.232 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
17:19:10.232 [dest1.group1-2] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
Ready!

Я не могу понять, в чем проблема с моей конфигурацией, или, если это какая-то ошибка, что-нибудь посоветовать?

Спасибо!

ИЗМЕНЕНО:

Я загрузил (не) рабочий пример здесь

Вы можете создать экземпляр RabbitMQ, используя:

docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management

person italktothewind    schedule 15.01.2019    source источник
comment
Вы показываете Processor для _2 _ / _ 3_, но я вижу в логах input2. Тебе это что-нибудь говорит?   -  person Artem Bilan    schedule 16.01.2019
comment
Я вручную редактировал журналы, потому что не хочу раскрывать бизнес-логику, я отредактирую ее.   -  person italktothewind    schedule 16.01.2019
comment
Сейчас его отредактировали.   -  person italktothewind    schedule 16.01.2019
comment
OK. Можно ли где-нибудь на GitHub найти какой-нибудь простой образец для игры? Спасибо   -  person Artem Bilan    schedule 16.01.2019
comment
Эти сообщения не совпадают. Контент может быть одинаковым, но id, amqp_consumerTag и т. Д. Все разные. Итак, действительно, разместите где-нибудь воспроизводимый образец (вы можете исключить бизнес-логику)   -  person Oleg Zhurakousky    schedule 16.01.2019
comment
Я загрузил этот репозиторий: github.com/italktothewind/spring-cloud-examples, Благодарность!   -  person italktothewind    schedule 16.01.2019
comment
Кроме того, вы уверены, что ваша очередь пуста перед запуском теста?   -  person Oleg Zhurakousky    schedule 16.01.2019
comment
Да, я перезапустил докер (docker run -p 5672: 5672 -p 15672: 15672 rabbitmq: 3-management), поэтому перед запуском тестов очереди были пусты.   -  person italktothewind    schedule 16.01.2019
comment
Я обнаружил проблему, спасибо, это произошло из-за обходного пути в классе конфигурации для использования инъекционных сервисов с использованием @Publisher.   -  person italktothewind    schedule 16.01.2019
comment
правильно, я просто собирался сказать, что удалите @Bean для постпроцессора и просто используйте @EnablePublisher("defaultChannel")   -  person Oleg Zhurakousky    schedule 16.01.2019
comment
При этом у вас много чего происходит, много избыточной конфигурации, а также вы не используете среду тестирования для spring-cloud-stream. Возможно, вам стоит пересмотреть / рефакторинг   -  person Oleg Zhurakousky    schedule 16.01.2019
comment
К сожалению, @EnablePublisher недостаточно для инъекционной службы: stackoverflow.com/questions/54150939/   -  person italktothewind    schedule 16.01.2019
comment
Достаточно. Я только что изменил ваш пример. . .   -  person Oleg Zhurakousky    schedule 16.01.2019
comment
Если вы удалите конфигурацию компонента, у вас будет следующее исключение: Вызвано: java.lang.IllegalStateException: метод @StreamListener 'dequeue' обнаружен в целевом классе компонента 'ExampleService', но не найден ни в одном интерфейсе (ах) для прокси-сервера JDK компонента. Либо подтяните метод к интерфейсу, либо переключитесь на прокси подкласса (CGLIB), установив для атрибута proxy-target-class / proxyTargetClass значение true   -  person italktothewind    schedule 16.01.2019
comment
Выполните mvn clean install (тест запускается повторно)   -  person italktothewind    schedule 16.01.2019
comment
Как я уже сказал, у меня есть ваш пример, работающий с @EnablePublisher и удаленной конфигурацией Bean   -  person Oleg Zhurakousky    schedule 16.01.2019
comment
Я попытался добавить EnablePublisher и удалить конфигурацию bean-компонента, но исключение все еще возникает.   -  person italktothewind    schedule 16.01.2019


Ответы (3)


Я обнаружил, что @Publisher публиковал дважды из-за конфигурации в ExampleConfig. Эта новая конфигурация (заимствованная из здесь) работает нормально. :

@Bean
public static BeanFactoryPostProcessor bfpp() {
    return bf -> bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
        PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
}
person italktothewind    schedule 15.01.2019
comment
тебе не нужно setProxyTargetClass(true) - person Oleg Zhurakousky; 16.01.2019
comment
Если я удалю эту конфигурацию Bean, у меня будет эта трассировка стека: Вызвано: java.lang.IllegalStateException: @StreamListener метод 'dequeue' найден в целевом классе bean-компонента 'ExampleService', но не найден ни в одном интерфейсе (ах) для прокси-сервера JDK bean. Либо подтяните метод к интерфейсу, либо переключитесь на прокси подкласса (CGLIB), установив для атрибута proxy-target-class / proxyTargetClass значение true - person italktothewind; 16.01.2019
comment
Другой ответ на эту проблему - явное имя bean-компонента для вашего PublisherAnnotationBeanPostProcessor, и оно такое: IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME. Проблема в том, что Framework по-прежнему создает дефолтный. Итак, ваш класс действительно дважды проксируется. - person Artem Bilan; 16.01.2019

Я запускал свое приложение в режиме отладки (intellij), из-за чего каким-то образом смещение не обновлялось. Попробуйте запустить в режиме запуска, и это решило мою проблему.

person Shantam Mittal    schedule 22.08.2019

Я думаю, что из конфигурации вы пытаетесь снова опубликовать то же сообщение в том же месте назначения dest_1.

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

И из журнала видно, что у 2-го сообщения другой ID

id=788e8bbf-4ae4-86cc-0859-d4f153cb5807
id=2f22ce16-bb5a-350c-8b3d-e6c898760888
person Sabuj Das    schedule 15.01.2019
comment
Я думаю, что input1 - это очередь dest_1.group1, а output - обмен dest_1. - person italktothewind; 16.01.2019
comment
Да, действительно, это были разные идентификаторы, потому что они были опубликованы дважды, спасибо! - person italktothewind; 16.01.2019