как установить basicQos в spring amqp для справедливой отправки?

Это моя текущая весенняя конфигурация amqp

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}"/>

<rabbit:admin id="rabbitmqAdmin" connection-factory="rabbitConnectionFactory" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory">
</rabbit:template>

а это мои обмены, очереди, слушатели, responseQueues, конфигурация answerHandlers

<rabbit:queue name="${process1.queue}" />
<rabbit:queue name="${process1.reply.queue}" />

<rabbit:queue name="${process2.queue}" />
<rabbit:queue name="${process2.reply.queue}" />

<rabbit:direct-exchange name="${myExchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${process1.queue}"
            key="${process1.routing.key}" />
        <rabbit:binding queue="${process2.queue}"
            key="${process2.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${my.listener.concurrency}"
    requeue-rejected="false">
    <rabbit:listener queues="${process1.queue}"
        ref="foundation" method="process1" />
    <rabbit:listener queues="${process2.queue}"
        ref="foundation" method="process2s" />
</rabbit:listener-container>


<beans:beans profile="master">

    <beans:bean id="process1Lbq" class="java.util.concurrent.LinkedBlockingQueue" />
    <beans:bean id="process2Lbq" class="java.util.concurrent.LinkedBlockingQueue" />

    <beans:bean id="process1sReplyHandler"
        class="com.stockopedia.batch.foundation.ReplyHandler"
        p:blockingQueue-ref="process1Lbq" />

    <beans:bean id="process2ReplyHandler"
        class="com.stockopedia.batch.foundation.ReplyHandler"
        p:blockingQueue-ref="process2Lbq" />

    <rabbit:listener-container
        connection-factory="rabbitConnectionFactory" concurrency="1"
        requeue-rejected="false">
        <rabbit:listener queues="${process1.reply.queue}"
            ref="process1sHandler" method="onMessage" />
        <rabbit:listener queues="${process2.reply.queue}"
            ref="process2ReplyHandler" method="onMessage" />
    </rabbit:listener-container>

</beans:beans>

Я установил это на 6 разных серверах и ставил в очередь сообщения только с главных серверов. Другие серверы только обрабатывают сообщения. На всех серверах работает одинаковое количество прослушивателей, заданное параллелизмом.

Проблема в том, что для обработки сообщений требуется разное время. Некоторые сообщения приходят долго. Таким образом, в настоящее время некоторые серверы не получают сообщения из очередей, даже все слушатели на этих серверах обрабатывают сообщения.

Я вижу ожидающие сообщения в очереди для обработки, а некоторые серверы просто простаивают. Я хочу, чтобы этот сервер собирал оставшиеся сообщения, пока другие серверы заняты обработкой своих сообщений.

Нужно ли мне устанавливать basic_Quos, как указано в руководстве http://www.rabbitmq.com/tutorials/tutorial-two-java.html (честная отправка) ?

int prefetchCount = 1;
channel.basicQos(prefetchCount);

или это по умолчанию для весеннего ampq? Если нет, как мне это сделать?


person vishal    schedule 19.05.2014    source источник


Ответы (1)


basicQos(1) — это настройка по умолчанию для контейнера прослушивателя; его можно изменить, установив prefetch в контейнере.

Я вижу ожидающие сообщения в очереди для обработки, а некоторые серверы просто простаивают.

Вы не должны видеть сообщения, просто находящиеся в очереди, если у вас есть незанятые потребители. Если сообщения помечены как неподтвержденные, они обрабатываются.

Если вы включите ведение журнала на уровне DEBUG, вы сможете увидеть простаивающих потребителей, опрашивающих внутреннюю очередь для новых доставок.

person Gary Russell    schedule 19.05.2014