Как настроить потребителя RabbitMQ для потребления из непустой очереди?

В настоящее время я работаю с RabbitMQ в Python, используя клиент Pika для создания сервера, который обрабатывает различные типы сообщений. Базовая установка, которую я имею, — это одна очередь, получающая все входящие сообщения, процесс маршрутизации, который направляет их в правильные места назначения, и несколько процессов для обработки запросов и приема входящих данных. Эта установка работала нормально, за исключением одного конкретного случая. Когда у меня работает сервер RabbitMQ до запуска серверных процессов и он получает сообщение, он правильно сохраняет их в очереди входящих сообщений. Однако, когда я затем пытаюсь запустить эти процессы и настроить потребителя на эту непустую входящую очередь с помощью функции pika.basic_consume, программа зависает. Итак, на данный момент, если я хочу запустить свои серверные процессы, я должен очистить все сообщения из очередей, прежде чем он заработает правильно. Как исправить это для работы с непустыми очередями?


Вот пример одного из процессов, все они настроены практически так же, как и этот.

class Router(Process):

    def __init__(self,routing_table):
        super(Router,self).__init__()
        self.routing_table = routing_table

        self.routeQueues    = {
            'r' : 'registration',
            't' : 'util',
            'p' : 'util',
            's' : 'data'
        }

        # Create a connection to the RabbitMQ server.
        self.rabbitConn =  pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.rabbitConn.channel()

        # Load all of the existing registered node queues
        with open('registrations/nodes.txt','r') as nodes:
            for line in nodes:
                info = line.strip().split(":")
                self.channel.queue_declare(info[1])

        # Declare the default queues
        queue_list = ["incoming","registration","util"]
        for queueName in queue_list:
            self.channel.queue_declare(queueName)

        # Start consuming things from the incoming queue
        self.channel.basic_consume(self.gotPacket,queue='incoming')

    def gotPacket(self,ch,method,params,body):
        # Does stuff. Not relevant here.
        pass

   def run(self):
       self.channel.start_consuming()

person Firebarrage    schedule 30.06.2015    source источник
comment
Это не должно быть проблемой. Я бы порекомендовал вам попробовать обновиться до последней версии pika. В качестве альтернативы попробуйте альтернативную библиотеку, например rabbitpy или мою собственную amqp-storm.   -  person eandersson    schedule 07.07.2015
comment
@eandersson Я использую последнюю версию pika 0.9.13. Если это вообще возможно, я бы предпочел продолжать использовать pika, так как на этом этапе было бы очень сложно переключиться на другую библиотеку.   -  person Firebarrage    schedule 10.07.2015
comment
Около года назад вышла версия 0.9.14, в которой было исправлено множество проблем.   -  person eandersson    schedule 10.07.2015
comment
@eandersson Оказывается, это было так! Пип утверждал, что у меня есть последняя версия по какой-то странной причине, немного возни, и я получил ее, чтобы установить более новую. Спасибо!   -  person Firebarrage    schedule 13.07.2015


Ответы (1)


Эта проблема была вызвана библиотекой pika 0.9.13. Обновление до pika 0.9.14 решает эту проблему. @еандерссон

person Firebarrage    schedule 13.07.2015