Хранилище сообщений для постоянной доставки брокеру AMQP в Spring Integration

Я пытаюсь построить поток интеграции, который предотвратит потерю сообщений во время доставки к брокеру AMQP (rabbitMQ). В случае остановки брокера я наблюдаю неожиданное для себя поведение:

  1. Неудачные сообщения сохраняются в хранилище сообщений, но ненадолго. Этот поток не ожидает доступности брокера, он извлекает сообщения из хранилища сообщений, даже если брокер все еще остановлен.
  2. В случае успешного перезапуска rabbitmq записи из хранилища сообщений (если они еще присутствуют) не доставляются в очередь.

Пожалуйста, помогите мне в расследованиях. Пример кода:

 @Bean
public MessageChannel messageStoreBackedChannel() {
    return new QueueChannel(
            new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
    );
}

 @Bean
public IntegrationFlow someFlow() {
    return IntegrationFlows
            .from("messageStoreBackedChannel")
            .channel("amqpMessageChannel")
            .get();
}

@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlows
            .from("amqpMessageChannel")
            .handle(message -> System.out.println(message.getPayload()))
            .get();
}


@Bean
public MessageChannel amqpMessageChannel() {
    return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}

@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
    var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());

    return jdbcChannelMessageStore;
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

person Vladyslav Bozhenko    schedule 01.04.2021    source источник


Ответы (1)


Рассмотрите возможность настройки конечной точки между вашим .from("messageStoreBackedChannel").channel("amqpMessageChannel") как transactional().

Что-то вроде этого:

.from("messageStoreBackedChannel")
.bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
.channel("amqpMessageChannel")

Таким образом, всякий раз, когда доставка в amqpMessageChannel терпит неудачу, транзакция будет откатываться, и неудачное сообщение будет возвращаться в хранилище до следующего опроса.

Конечно, вы можете остановить эту конечную точку bridge, когда получите сообщение об ошибке при подключении к RabbitMQ. Но как тогда определить, что связь возвращается?..

person Artem Bilan    schedule 01.04.2021
comment
.bridge(e -> e.transactional()) — не изменил поведение для моего случая. Первая неудачная повторная попытка (org.springframework.messaging.MessageDeliveryException: не удалось отправить сообщение на канал amqpMessageChannel) удаляет одну запись из хранилища сообщений. Есть ли у нас возможность сделать больше одной такой повторной попытки, прежде чем запись будет удалена из хранилища? - person Vladyslav Bozhenko; 01.04.2021
comment
Ой! Ждать. попробуйте это: .bridge(e -> e.poller(p -> p.fixedDelay(10).transactional())). Поллер должен быть транзакционным - person Artem Bilan; 01.04.2021
comment
Да, это решило мою проблему. Большое спасибо, это сэкономило мое время! - person Vladyslav Bozhenko; 01.04.2021
comment
У меня есть еще один вопрос по этой теме, пожалуйста. Есть ли у весенней интеграции какие-либо инструменты для настройки долговечности сообщения в хранилище сообщений (например, оно будет удалено через некоторое время или после некоторого количества попыток повторной отправки) - person Vladyslav Bozhenko; 02.04.2021
comment
Поскольку не все постоянные хранилища обеспечивают автоматическое определение времени жизни, у нас нет такой функции в фреймворке, на которую можно было бы положиться. Однако мы можем добавить что-то похожее на MessageGroupStoreReaper, но для каждого сообщения в группе - person Artem Bilan; 02.04.2021