Spring Integration: сообщение выпущено дважды после задержки

Я использую приведенный ниже фрагмент XML:

<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
                                  queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
    <int:delayer id="commandDelayer" default-delay="30000"/>
    <int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>

<int:payload-type-router input-channel="commands">
....
....

Он выполняет следующие задачи:

  1. Потребляйте сообщения из очереди RabbitMQ с именем «команды».
  2. Задержать выполнение сообщения на 30 секунд.
  3. Приступить к дальнейшему выполнению сообщения после указанной задержки.

Если сообщение уже присутствует в очереди команд до запуска приложения с приведенным выше кодом, при запуске приложение дважды выполняет сообщение в отдельных потоках.

Кажется, я знаю, почему это происходит.

Spring перепланирует сообщения, сохраняемые в хранилище сообщений DelayHandler, после полной инициализации контекста приложения. Обратитесь к приведенному ниже фрагменту кода из DelayHandler.java:

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!this.initialized.getAndSet(true)) {
        this.reschedulePersistedMessages();
    }
}

Таким образом, если сообщение уже присутствовало в очереди RabbitMQ до запуска приложения, во время инициализации контекста Spring сообщение извлекается из очереди и добавляется в хранилище сообщений DelayHandler. После того, как инициализация контекста завершена, и если за это время сообщение не будет выпущено из хранилища сообщений, приведенный выше фрагмент кода перепланирует то же самое сообщение.

Теперь, когда два отдельных потока выполняют одно и то же сообщение, если один поток выполнился, то сообщение должно быть удалено из хранилища сообщений, а другой поток не должен продолжать выполнение.

Во время выполнения потока приведенный ниже фрагмент кода из DelayHandler.java позволяет второму потоку выпустить повторяющееся сообщение, что приводит к дублирующему выполнению одного и того же сообщения, поскольку хранилище сообщений является экземпляром SimpleMessageStore, и дальнейшая проверка для остановки выполнения не требуется. .

private void doReleaseMessage(Message<?> message) {
    if (this.messageStore instanceof SimpleMessageStore
            || ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
        this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
        this.handleMessageInternal(message);
    }
    else {
        if (logger.isDebugEnabled()) {
            logger.debug("No message in the Message Store to release: " + message +
                    ". Likely another instance has already released it.");
        }
    }
}

Это ошибка в Spring Integration?


person Saurav Garg    schedule 12.11.2014    source источник


Ответы (1)


Ну что ж!

Это действительно милая ошибка.

Спасибо, что указали на это!

Пожалуйста, поднимите вопрос JIRA, и мы рассмотрим его в следующем выпуске.

Я могу объяснить, что происходит.

Все Spring Integration начинают свою работу с Lifecycle.start(). В вашем случае <int-amqp:inbound-channel-adapter> получает сообщение от RabbitMQ и отправляет его в поток интеграции. И они delayed.

И только после start контекст приложения поднимает ContextRefreshedEvent. Уловив, что даже DelayHandler получает все сообщения от messageStore и, как вы заметили, reschedules их.

Следовательно, да, у нас может быть две запланированные задачи для одного и того же сообщения.

Что забавно, это только для SimpleMessageStore, потому что у него нет функции removeMessage для сообщений, которые хранятся в groups.

Я вижу несколько вариантов обходного пути:

  1. Задержите start на <int-amqp:inbound-channel-adapter>. Например, обработайте тот же ContextRefreshedEvent из <inbound-channel-adapter> и отправьте командное сообщение @amqpAdapter.start() в <control-bus>.

  2. Начиная с Spring Integration 4.1, доступен еще один вариант, и его имя — Idempotent Receiver. Используя это, вы можете отбросить сообщение duplicate, и я предполагаю, что idempotentKey точно равно messageId. Чистый шаблон идемпотентного получателя!

  3. И еще один вариант лежит под persistent MessageStore, где мы действительно можем рассчитывать на работу removeMessage.

Билет JIRA по этому вопросу: https://jira.spring.io/browse/INT-3560< /а>

person Artem Bilan    schedule 13.11.2014