Это моя текущая весенняя конфигурация amqp
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}"/>
<rabbit:admin id="rabbitmqAdmin" connection-factory="rabbitConnectionFactory" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory">
</rabbit:template>
а это мои обмены, очереди, слушатели, responseQueues, конфигурация answerHandlers
<rabbit:queue name="${process1.queue}" />
<rabbit:queue name="${process1.reply.queue}" />
<rabbit:queue name="${process2.queue}" />
<rabbit:queue name="${process2.reply.queue}" />
<rabbit:direct-exchange name="${myExchange}">
<rabbit:bindings>
<rabbit:binding queue="${process1.queue}"
key="${process1.routing.key}" />
<rabbit:binding queue="${process2.queue}"
key="${process2.routing.key}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${my.listener.concurrency}"
requeue-rejected="false">
<rabbit:listener queues="${process1.queue}"
ref="foundation" method="process1" />
<rabbit:listener queues="${process2.queue}"
ref="foundation" method="process2s" />
</rabbit:listener-container>
<beans:beans profile="master">
<beans:bean id="process1Lbq" class="java.util.concurrent.LinkedBlockingQueue" />
<beans:bean id="process2Lbq" class="java.util.concurrent.LinkedBlockingQueue" />
<beans:bean id="process1sReplyHandler"
class="com.stockopedia.batch.foundation.ReplyHandler"
p:blockingQueue-ref="process1Lbq" />
<beans:bean id="process2ReplyHandler"
class="com.stockopedia.batch.foundation.ReplyHandler"
p:blockingQueue-ref="process2Lbq" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="1"
requeue-rejected="false">
<rabbit:listener queues="${process1.reply.queue}"
ref="process1sHandler" method="onMessage" />
<rabbit:listener queues="${process2.reply.queue}"
ref="process2ReplyHandler" method="onMessage" />
</rabbit:listener-container>
</beans:beans>
Я установил это на 6 разных серверах и ставил в очередь сообщения только с главных серверов. Другие серверы только обрабатывают сообщения. На всех серверах работает одинаковое количество прослушивателей, заданное параллелизмом.
Проблема в том, что для обработки сообщений требуется разное время. Некоторые сообщения приходят долго. Таким образом, в настоящее время некоторые серверы не получают сообщения из очередей, даже все слушатели на этих серверах обрабатывают сообщения.
Я вижу ожидающие сообщения в очереди для обработки, а некоторые серверы просто простаивают. Я хочу, чтобы этот сервер собирал оставшиеся сообщения, пока другие серверы заняты обработкой своих сообщений.
Нужно ли мне устанавливать basic_Quos, как указано в руководстве http://www.rabbitmq.com/tutorials/tutorial-two-java.html (честная отправка) ?
int prefetchCount = 1;
channel.basicQos(prefetchCount);
или это по умолчанию для весеннего ampq? Если нет, как мне это сделать?