Я использую приведенный ниже фрагмент XML:
<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
<int:delayer id="commandDelayer" default-delay="30000"/>
<int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>
<int:payload-type-router input-channel="commands">
....
....
Он выполняет следующие задачи:
- Потребляйте сообщения из очереди RabbitMQ с именем «команды».
- Задержать выполнение сообщения на 30 секунд.
- Приступить к дальнейшему выполнению сообщения после указанной задержки.
Если сообщение уже присутствует в очереди команд до запуска приложения с приведенным выше кодом, при запуске приложение дважды выполняет сообщение в отдельных потоках.
Кажется, я знаю, почему это происходит.
Spring перепланирует сообщения, сохраняемые в хранилище сообщений DelayHandler, после полной инициализации контекста приложения. Обратитесь к приведенному ниже фрагменту кода из DelayHandler.java
:
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!this.initialized.getAndSet(true)) {
this.reschedulePersistedMessages();
}
}
Таким образом, если сообщение уже присутствовало в очереди RabbitMQ до запуска приложения, во время инициализации контекста Spring сообщение извлекается из очереди и добавляется в хранилище сообщений DelayHandler. После того, как инициализация контекста завершена, и если за это время сообщение не будет выпущено из хранилища сообщений, приведенный выше фрагмент кода перепланирует то же самое сообщение.
Теперь, когда два отдельных потока выполняют одно и то же сообщение, если один поток выполнился, то сообщение должно быть удалено из хранилища сообщений, а другой поток не должен продолжать выполнение.
Во время выполнения потока приведенный ниже фрагмент кода из DelayHandler.java
позволяет второму потоку выпустить повторяющееся сообщение, что приводит к дублирующему выполнению одного и того же сообщения, поскольку хранилище сообщений является экземпляром SimpleMessageStore, и дальнейшая проверка для остановки выполнения не требуется. .
private void doReleaseMessage(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore
|| ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("No message in the Message Store to release: " + message +
". Likely another instance has already released it.");
}
}
}
Это ошибка в Spring Integration?