Пролетна интеграция: Съобщението е пуснато два пъти след забавяне

Използвам следния XML фрагмент:

<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
                                  queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
    <int:delayer id="commandDelayer" default-delay="30000"/>
    <int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>

<int:payload-type-router input-channel="commands">
....
....

Изпълнява тези задачи:

  1. Консумирайте съобщения от опашката на RabbitMQ, наречени „команди“.
  2. Забавяне на изпълнението на съобщението с 30 секунди.
  3. Продължете с по-нататъшно изпълнение за съобщение след определеното забавяне.

Ако съобщението вече присъства в опашката с команди, преди приложението с горния код да бъде стартирано, при стартиране приложението изпълнява съобщението два пъти в отделни нишки.

Мисля, че знам защо се случва това.

Spring пренасрочва съобщенията, които се съхраняват в хранилището за съобщения на DelayHandler, след като контекстът на приложението е напълно инициализиран. Обърнете се към следния кодов фрагмент от DelayHandler.java:

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!this.initialized.getAndSet(true)) {
        this.reschedulePersistedMessages();
    }
}

Така че, ако съобщението вече присъства в опашката на RabbitMQ преди стартиране на приложението, по време на инициализацията на контекста на Spring съобщението се взема от опашката и се добавя към хранилището за съобщения на DelayHandler. След като инициализирането на контекста е направено и ако междувременно съобщението не бъде освободено от хранилището за съобщения, горният кодов фрагмент пренасрочва същото съобщение.

Сега, когато две отделни нишки изпълняват едно и също съобщение, ако едната нишка е изпълнена, тогава съобщението трябва да бъде изтрито от хранилището за съобщения и другата нишка не трябва да продължи с изпълнението.

Докато нишката се изпълнява, частта от кода по-долу от DelayHandler.java позволява на втората нишка да освободи дублирано съобщение, което води до дублирано изпълнение за същото съобщение, тъй като хранилището за съобщения е екземпляр на SimpleMessageStore и няма допълнителна проверка за спиране на изпълнението .

private void doReleaseMessage(Message<?> message) {
    if (this.messageStore instanceof SimpleMessageStore
            || ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
        this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
        this.handleMessageInternal(message);
    }
    else {
        if (logger.isDebugEnabled()) {
            logger.debug("No message in the Message Store to release: " + message +
                    ". Likely another instance has already released it.");
        }
    }
}

Това грешка ли е в Spring Integration?


person Saurav Garg    schedule 12.11.2014    source източник


Отговори (1)


Актуализирането до най-новата версия на NuGet Package Manager (2.6.40627) реши проблема за мен.
person Artem Bilan    schedule 13.11.2014