Как справиться с большой загрузкой сообщений в Spring Integration?

Внешний модуль отправляет тысячи сообщений брокеру сообщений. Каждое сообщение имеет свойство TimeToLive, равное 5 секундам. Другой модуль должен потреблять и обрабатывать ВСЕ сообщения.

Из документации Spring Integration я обнаружил, что поэтапная архитектура, управляемая событиями (потребители), лучше реагирует на значительные всплески нагрузки.

Моя текущая реализация использует EDA (даже управляемую архитектуру), например.

<si:channel id="inputChannel"/>

<!-- get messages from PRESENCE_ENGINE queue -->    
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" 
    channel="inputChannel" destination="sso" connection-factory="connectionFactory"  
    max-concurrent-consumers="1" auto-startup="true" acknowledge="transacted" extract-payload="true"/>

<si:service-activator id ="activatorClient" input-channel="inputChannel" ref="messageService" method="processMessage"/> 

<bean id="messageService" class="com.my.messaging.MessageService"/>

<bean id="sso" 
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="SSO" />
</bean> 

Очевидно, из-за большой нагрузки, например. входящие тысячи сообщений, processMessage () может занять более 5 секунд. и MessageService может не обрабатывать все сообщения.

Мои идеи следующие:

  1. Измените processMessage () так, чтобы сообщение, а не обрабатываемое, хранилось только в MongoDB. Тогда я мог бы самостоятельно обрабатывать сообщения в отдельной задаче. В таком сценарии MongoDB будет служить кешем.

  2. Используйте большое количество потребителей (модель SEDA). InputChannel - это прямой канал.

  3. Асинхронно обрабатывать сообщения, например inputChannel - это канал очереди, и сообщения обрабатываются асинхронно.

Прежде чем принять решение, хочу спросить, какой сценарий более эффективен. Возможно, сценарии 2) и 3) предоставляют механизм для удовлетворения моего требования, чтобы ВСЕ сообщения обрабатывались даже при больших нагрузках.

РЕДАКТИРОВАТЬ:

Я уже реализовал сценарий 2, в котором я продолжаю отправлять 1000 сообщений в секунду. Это статистика, сколько сообщений пропало с разными параметрами:

max-concurrent-потребители; TimeToLive = 5 сек .; Лимит простоя-потребителя; # отправленных сообщений; # полученных сообщений

 10 ; Yes ; 1   ; 1001 ; 297
100 ; Yes ; 1   ; 1001 ; 861
150 ; Yes ; 1   ; 1001 ; 859
300 ; Yes ; 1   ; 1001 ; 861
300 ; No  ; 1   ; 1001 ; 860
300 ; No  ; 100 ; 1001 ; 1014
300 ; No  ; 50  ; 1001 ; 1011

Кажется, что idle-consumer-limit создает потребителей более агрессивно, чем max-concurrent потребителей. Это хороший подход для использования предела простоя-потребителя в таком сценарии?

Это мои файлы конфигурации для отправителя / потребителя:

<!-- SENDER  
Keep Alive Sender sends messages to backup server -->    

<si:channel id="sendToChannel"/>
<si:channel id="presChannel"/>

<si:inbound-channel-adapter id="senderEntity" channel="sendToChannel" method="sendMessage"> 
    <bean class="com.ucware.ucpo.sso.cache.CacheSender"/>
    <si:poller fixed-rate="${sender.sendinterval}"></si:poller>
</si:inbound-channel-adapter>    

<si:router id="messageRouter" method="routeMessage" input-channel="sendToChannel">
    <bean class="com.ucware.ucpo.sso.messaging.MessageRouter"/>
</si:router>

<!-- Subscriber to a channel dispatcher, Send messages to JMS -->
<int-jms:outbound-channel-adapter  explicit-qos-enabled="${jms.qos.enabled}" time-to-live="${jms.message.lifetime}" 
    channel="presChannel" connection-factory="connectionFactory" destination="pres" extract-payload="false"/>

<bean id="pres" 
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="PRES" />
</bean>


<!-- RECEIVER -->

<si:channel id="receiveChannel"/>

<!-- get messages from PRES queue -->    
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" 
    channel="receiveChannel" destination="presence" connection-factory="connectionFactory"  idle-consumer-limit="50" 
    max-concurrent-consumers="300" auto-startup="true" acknowledge="transacted" extract-payload="true"/>

<si:service-activator id ="activatorClient" input-channel="receiveChannel" ref="messageService" method="processMessage"/> 


<bean id="messageService" class="com.cache.MessageService"/>

person luksmir    schedule 14.11.2013    source источник
comment
Кажется, у вас странное сочетание требований. Ограниченное время жизни в сочетании с необходимой доставкой доставит вам проблемы. Есть ли причина, по которой вы не можете убрать время, чтобы жить? Или настроить брокера для помещения их в постоянную очередь?   -  person Steve    schedule 14.11.2013
comment
В самом деле, это требование мы могли бы опустить. Но мы должны быть уверены, что ВСЕ сообщения потребляются. Мои первоначальные тесты в сценарии 2 показали, что для 1000 сообщений, отправленных в течение минуты, было получено только 860 сообщений (с включенным и выключенным временем жизни).   -  person luksmir    schedule 15.11.2013
comment
Звучит странно ... брокеры сообщений должны быть в состоянии гарантировать, что сообщения не будут пропущены.   -  person Steve    schedule 15.11.2013


Ответы (1)


Прежде всего, вы можете попробовать поиграть со свойством max-concurrent-consumers. Как видите, в вашем случае 1 действительно мало. Вы должны выяснить, почему ваш MessageService так медленно. Любые другие случаи выглядят как накладные расходы, потому что JMS уже является постоянным и асинхронным - на основе очереди. Если это не помогает, используйте <queue> канал с присутствием MessageStore, например MongoDB

person Artem Bilan    schedule 14.11.2013
comment
Я увеличил количество max-concurent потребителей, и это помогло, но только до определенной степени (см. Мое обновление). +1 за предложение. - person luksmir; 15.11.2013