Агрегатор Spring Integration выпускает только одну группу из канала, поддерживаемого AMQP.

У меня проблема с моим приложением Spring Boot, когда в моем агрегаторе обрабатывается только одна группа, а затем приложение перестает потреблять больше сообщений из очереди. Кажется, он обрабатывает группу только при запуске. Я перезапустил приложение, и оно обработало другую группу, но потом снова остановилось.

Это мой поток ниже.

return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, importQueueName).errorChannel(errorChannel))
                .split(userImportSplitter)
                .channel(Amqp.channel(connectionFactory)
                        .queueName(USER_QUEUE_NAME)
                        .prefetchCount(batchSize))
                .aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(500))
                .handle(userImporter)
                .get();

person Ivan    schedule 27.06.2020    source источник


Ответы (1)


Вероятно ваш userImportSplitter выдает такой же conrrelationId, поэтому в агрегаторе формируется только одна группа и по умолчанию она не удаляется после релиза или таймаута.

Рассмотрите возможность использования этих параметров:

.aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                    .sendPartialResultOnExpiry(true)
                    .groupTimeout(500)
                    .expireGroupsUponCompletion(true)
                    .expireGroupsUponTimeout(true))
person Artem Bilan    schedule 29.06.2020
comment
Все еще делаю то же самое. - person Ivan; 29.06.2020
comment
Можем ли мы посмотреть, что делает ваш userImportSplitter? Как это работает, если вы удалите этот `.channel(Amqp.channel()`) между ними? - person Artem Bilan; 29.06.2020
comment
Догадаться. Это была проблема с userImporter, выбрасывающим исключение. Я предполагаю, что мой новый вопрос заключается в том, как сделать так, чтобы это сообщение не блокировало очередь? - person Ivan; 29.06.2020
comment
См. ExpressionEvaluatingRequestHandlerAdvice: docs.spring.io/spring-integration/docs/current/reference/html/. Разделитель - это просто цикл, и он терпит неудачу так же, как если бы вы потерпели неудачу в обычном цикле Java for(). Итак, чтобы предотвратить преждевременный выход из цикла, нам нужно try..catch это внутреннее исключение и что-то сделать с неудавшимся элементом. - person Artem Bilan; 29.06.2020