как да задам basicQos в пролетния amqp за справедливо изпращане?

Това е текущата ми пролетна amqp конфигурация

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}"/>

<rabbit:admin id="rabbitmqAdmin" connection-factory="rabbitConnectionFactory" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory">
</rabbit:template>

и това е моята конфигурация за обмен, опашки, слушатели, replyQueues, replyHandlers

<rabbit:queue name="${process1.queue}" />
<rabbit:queue name="${process1.reply.queue}" />

<rabbit:queue name="${process2.queue}" />
<rabbit:queue name="${process2.reply.queue}" />

<rabbit:direct-exchange name="${myExchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${process1.queue}"
            key="${process1.routing.key}" />
        <rabbit:binding queue="${process2.queue}"
            key="${process2.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${my.listener.concurrency}"
    requeue-rejected="false">
    <rabbit:listener queues="${process1.queue}"
        ref="foundation" method="process1" />
    <rabbit:listener queues="${process2.queue}"
        ref="foundation" method="process2s" />
</rabbit:listener-container>


<beans:beans profile="master">

    <beans:bean id="process1Lbq" class="java.util.concurrent.LinkedBlockingQueue" />
    <beans:bean id="process2Lbq" class="java.util.concurrent.LinkedBlockingQueue" />

    <beans:bean id="process1sReplyHandler"
        class="com.stockopedia.batch.foundation.ReplyHandler"
        p:blockingQueue-ref="process1Lbq" />

    <beans:bean id="process2ReplyHandler"
        class="com.stockopedia.batch.foundation.ReplyHandler"
        p:blockingQueue-ref="process2Lbq" />

    <rabbit:listener-container
        connection-factory="rabbitConnectionFactory" concurrency="1"
        requeue-rejected="false">
        <rabbit:listener queues="${process1.reply.queue}"
            ref="process1sHandler" method="onMessage" />
        <rabbit:listener queues="${process2.reply.queue}"
            ref="process2ReplyHandler" method="onMessage" />
    </rabbit:listener-container>

</beans:beans>

Настроил съм това на 6 различни сървъра и поставям на опашка съобщения само от главните сървъри. Други сървъри обработват само съобщения. Всички сървъри имат еднакъв брой работещи слушатели, както е зададено от паралелността.

Проблемът е, че съобщенията отнемат различно време за обработка. Някои съобщения отнемат много време. Така че в момента някои от сървърите не прихващат съобщения от опашки, дори всички слушатели на тези сървъри са готови с обработката на съобщенията там.

Виждам чакащите съобщения в опашката за обработка и някои сървъри просто не работят. Искам тези сървъри да вземат оставащите съобщения, докато другите сървъри са заети с обработката на техните съобщения.

Трябва ли да задам basic_Quos, както е споменато в урока http://www.rabbitmq.com/tutorials/tutorial-two-java.html (Честно изпращане) ?

int prefetchCount = 1;
channel.basicQos(prefetchCount);

или е по подразбиране за spring ampq? Ако не, как да го направя?


person vishal    schedule 19.05.2014    source източник


Отговори (1)


basicQos(1) е настройката по подразбиране за контейнера слушател; може да се промени чрез настройка на prefetch на контейнера.

Виждам чакащите съобщения в опашката за обработка и някои сървъри просто не работят.

Не трябва да виждате съобщения, които просто стоят в опашката, ако имате неактивни потребители. Ако съобщенията са маркирани като непроверени, те се обработват.

Ако включите регистриране на ниво DEBUG, ще можете да видите неактивни потребители, които проверяват вътрешна опашка за нови доставки.

person Gary Russell    schedule 19.05.2014