Използвам следния XML фрагмент:
<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
<int:delayer id="commandDelayer" default-delay="30000"/>
<int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>
<int:payload-type-router input-channel="commands">
....
....
Изпълнява тези задачи:
- Консумирайте съобщения от опашката на RabbitMQ, наречени „команди“.
- Забавяне на изпълнението на съобщението с 30 секунди.
- Продължете с по-нататъшно изпълнение за съобщение след определеното забавяне.
Ако съобщението вече присъства в опашката с команди, преди приложението с горния код да бъде стартирано, при стартиране приложението изпълнява съобщението два пъти в отделни нишки.
Мисля, че знам защо се случва това.
Spring пренасрочва съобщенията, които се съхраняват в хранилището за съобщения на DelayHandler, след като контекстът на приложението е напълно инициализиран. Обърнете се към следния кодов фрагмент от DelayHandler.java
:
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!this.initialized.getAndSet(true)) {
this.reschedulePersistedMessages();
}
}
Така че, ако съобщението вече присъства в опашката на RabbitMQ преди стартиране на приложението, по време на инициализацията на контекста на Spring съобщението се взема от опашката и се добавя към хранилището за съобщения на DelayHandler. След като инициализирането на контекста е направено и ако междувременно съобщението не бъде освободено от хранилището за съобщения, горният кодов фрагмент пренасрочва същото съобщение.
Сега, когато две отделни нишки изпълняват едно и също съобщение, ако едната нишка е изпълнена, тогава съобщението трябва да бъде изтрито от хранилището за съобщения и другата нишка не трябва да продължи с изпълнението.
Докато нишката се изпълнява, частта от кода по-долу от DelayHandler.java
позволява на втората нишка да освободи дублирано съобщение, което води до дублирано изпълнение за същото съобщение, тъй като хранилището за съобщения е екземпляр на SimpleMessageStore и няма допълнителна проверка за спиране на изпълнението .
private void doReleaseMessage(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore
|| ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("No message in the Message Store to release: " + message +
". Likely another instance has already released it.");
}
}
}
Това грешка ли е в Spring Integration?