Внешний модуль отправляет тысячи сообщений брокеру сообщений. Каждое сообщение имеет свойство TimeToLive, равное 5 секундам. Другой модуль должен потреблять и обрабатывать ВСЕ сообщения.
Из документации Spring Integration я обнаружил, что поэтапная архитектура, управляемая событиями (потребители), лучше реагирует на значительные всплески нагрузки.
Моя текущая реализация использует EDA (даже управляемую архитектуру), например.
<si:channel id="inputChannel"/>
<!-- get messages from PRESENCE_ENGINE queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="inputChannel" destination="sso" connection-factory="connectionFactory"
max-concurrent-consumers="1" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="inputChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.my.messaging.MessageService"/>
<bean id="sso"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SSO" />
</bean>
Очевидно, из-за большой нагрузки, например. входящие тысячи сообщений, processMessage () может занять более 5 секунд. и MessageService может не обрабатывать все сообщения.
Мои идеи следующие:
Измените processMessage () так, чтобы сообщение, а не обрабатываемое, хранилось только в MongoDB. Тогда я мог бы самостоятельно обрабатывать сообщения в отдельной задаче. В таком сценарии MongoDB будет служить кешем.
Используйте большое количество потребителей (модель SEDA). InputChannel - это прямой канал.
- Асинхронно обрабатывать сообщения, например inputChannel - это канал очереди, и сообщения обрабатываются асинхронно.
Прежде чем принять решение, хочу спросить, какой сценарий более эффективен. Возможно, сценарии 2) и 3) предоставляют механизм для удовлетворения моего требования, чтобы ВСЕ сообщения обрабатывались даже при больших нагрузках.
РЕДАКТИРОВАТЬ:
Я уже реализовал сценарий 2, в котором я продолжаю отправлять 1000 сообщений в секунду. Это статистика, сколько сообщений пропало с разными параметрами:
max-concurrent-потребители; TimeToLive = 5 сек .; Лимит простоя-потребителя; # отправленных сообщений; # полученных сообщений
10 ; Yes ; 1 ; 1001 ; 297
100 ; Yes ; 1 ; 1001 ; 861
150 ; Yes ; 1 ; 1001 ; 859
300 ; Yes ; 1 ; 1001 ; 861
300 ; No ; 1 ; 1001 ; 860
300 ; No ; 100 ; 1001 ; 1014
300 ; No ; 50 ; 1001 ; 1011
Кажется, что idle-consumer-limit создает потребителей более агрессивно, чем max-concurrent потребителей. Это хороший подход для использования предела простоя-потребителя в таком сценарии?
Это мои файлы конфигурации для отправителя / потребителя:
<!-- SENDER
Keep Alive Sender sends messages to backup server -->
<si:channel id="sendToChannel"/>
<si:channel id="presChannel"/>
<si:inbound-channel-adapter id="senderEntity" channel="sendToChannel" method="sendMessage">
<bean class="com.ucware.ucpo.sso.cache.CacheSender"/>
<si:poller fixed-rate="${sender.sendinterval}"></si:poller>
</si:inbound-channel-adapter>
<si:router id="messageRouter" method="routeMessage" input-channel="sendToChannel">
<bean class="com.ucware.ucpo.sso.messaging.MessageRouter"/>
</si:router>
<!-- Subscriber to a channel dispatcher, Send messages to JMS -->
<int-jms:outbound-channel-adapter explicit-qos-enabled="${jms.qos.enabled}" time-to-live="${jms.message.lifetime}"
channel="presChannel" connection-factory="connectionFactory" destination="pres" extract-payload="false"/>
<bean id="pres"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="PRES" />
</bean>
<!-- RECEIVER -->
<si:channel id="receiveChannel"/>
<!-- get messages from PRES queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="receiveChannel" destination="presence" connection-factory="connectionFactory" idle-consumer-limit="50"
max-concurrent-consumers="300" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="receiveChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.cache.MessageService"/>