Как да настроя потребител на RabbitMQ да консумира от непразна опашка?

В момента работя с RabbitMQ в Python, използвайки клиента Pika, за да създам сървър, който обработва различни типове съобщения. Основната настройка, която имам, е една опашка, която получава всички входящи съобщения, процес на маршрутизиране, който ги насочва към правилните дестинации, и няколко процеса за обработка на заявки и приемане на входящи данни. Тази настройка работи добре, освен в един конкретен случай. Когато сървърът RabbitMQ работи, преди процесите на сървъра да бъдат стартирани и той получи съобщение, той правилно ги съхранява в опашката за входящи съобщения. Въпреки това, когато след това се опитам да стартирам тези процеси и да настроя потребител на тази непразна входяща опашка с функцията pika.basic_consume, програмата увисва. Така че в момента, ако искам да стартирам сървърните си процеси, трябва да изчистя всички съобщения от опашките, преди да работи правилно. Как да поправя това, за да работи с непразни опашки?


Ето извадка от един от процесите, всички те са настроени по същество еднакви като този.

class Router(Process):

    def __init__(self,routing_table):
        super(Router,self).__init__()
        self.routing_table = routing_table

        self.routeQueues    = {
            'r' : 'registration',
            't' : 'util',
            'p' : 'util',
            's' : 'data'
        }

        # Create a connection to the RabbitMQ server.
        self.rabbitConn =  pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.rabbitConn.channel()

        # Load all of the existing registered node queues
        with open('registrations/nodes.txt','r') as nodes:
            for line in nodes:
                info = line.strip().split(":")
                self.channel.queue_declare(info[1])

        # Declare the default queues
        queue_list = ["incoming","registration","util"]
        for queueName in queue_list:
            self.channel.queue_declare(queueName)

        # Start consuming things from the incoming queue
        self.channel.basic_consume(self.gotPacket,queue='incoming')

    def gotPacket(self,ch,method,params,body):
        # Does stuff. Not relevant here.
        pass

   def run(self):
       self.channel.start_consuming()

person Firebarrage    schedule 30.06.2015    source източник
comment
Това не би трябвало да е проблем. Бих ви препоръчал да опитате да надстроите до най-новата версия на pika. Като алтернатива опитайте алтернативна библиотека като rabbitpy или моя собствен amqp-storm.   -  person eandersson    schedule 07.07.2015
comment
@eandersson Използвам pika 0.9.13, най-новата версия. Ако изобщо е възможно, бих предпочел да продължа да използвам pika, тъй като на този етап би било огромно начинание да премина към друга библиотека.   -  person Firebarrage    schedule 10.07.2015
comment
0.9.14 беше пуснат преди около година, което поправи много проблеми.   -  person eandersson    schedule 10.07.2015
comment
@eandersson Оказа се, че това е! Pip твърдеше, че имам най-новата версия по някаква странна причина, малко бъркане и го накарах, за да инсталирам по-новата. Благодаря!   -  person Firebarrage    schedule 13.07.2015


Отговори (1)


Този проблем е причинен от библиотеката pika 0.9.13. Надграждането до pika 0.9.14 решава този проблем. @eandersson

person Firebarrage    schedule 13.07.2015