Я пытаюсь построить поток интеграции, который предотвратит потерю сообщений во время доставки к брокеру AMQP (rabbitMQ). В случае остановки брокера я наблюдаю неожиданное для себя поведение:
- Неудачные сообщения сохраняются в хранилище сообщений, но ненадолго. Этот поток не ожидает доступности брокера, он извлекает сообщения из хранилища сообщений, даже если брокер все еще остановлен.
- В случае успешного перезапуска rabbitmq записи из хранилища сообщений (если они еще присутствуют) не доставляются в очередь.
Пожалуйста, помогите мне в расследованиях. Пример кода:
@Bean
public MessageChannel messageStoreBackedChannel() {
return new QueueChannel(
new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
);
}
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows
.from("messageStoreBackedChannel")
.channel("amqpMessageChannel")
.get();
}
@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlows
.from("amqpMessageChannel")
.handle(message -> System.out.println(message.getPayload()))
.get();
}
@Bean
public MessageChannel amqpMessageChannel() {
return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}
@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}