Python и RabbitMQ — лучший способ прослушивания событий из нескольких каналов?

У меня есть два отдельных экземпляра RabbitMQ. Я пытаюсь найти лучший способ прослушивания событий от обоих.

Например, я могу использовать события на одном со следующим:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

У меня есть второй хост, "host2", который я тоже хотел бы послушать. Я думал о создании двух отдельных потоков для этого, но из того, что я читал, pika не является потокобезопасным. Есть ли способ лучше? Или будет достаточно создать два отдельных потока, каждый из которых слушает разные экземпляры Rabbit (host1 и host2)?


person blindsnowmobile    schedule 16.02.2015    source источник


Ответы (3)


Ответ на вопрос «какой способ лучше всего» сильно зависит от вашего шаблона использования очередей и от того, что вы подразумеваете под «лучшим». Поскольку я пока не могу комментировать вопросы, я просто попытаюсь предложить некоторые возможные решения.

В каждом примере я буду предполагать, что exchange уже объявлен.

Потоки

Вы можете использовать сообщения из двух очередей на разных хостах в одном процессе, используя pika.

Вы правы — как указано в часто задаваемых вопросах, pika не является потокобезопасным, но его можно использовать в многопоточном режиме, создавая подключения к узлам RabbitMQ для каждого потока. Запуск этого примера в потоках с использованием модуля threading выглядит следующим образом:

import pika
import threading


class ConsumerThread(threading.Thread):
    def __init__(self, host, *args, **kwargs):
        super(ConsumerThread, self).__init__(*args, **kwargs)

        self._host = host

    # Not necessarily a method.
    def callback_func(self, channel, method, properties, body):
        print("{} received '{}'".format(self.name, body))

    def run(self):
        credentials = pika.PlainCredentials("guest", "guest")

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self._host,
                                      credentials=credentials))

        channel = connection.channel()

        result = channel.queue_declare(exclusive=True)

        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")

        channel.basic_consume(self.callback_func,
                              result.method.queue,
                              no_ack=True)

        channel.start_consuming()


if __name__ == "__main__":
    threads = [ConsumerThread("host1"), ConsumerThread("host2")]
    for thread in threads:
        thread.start()

Я объявил callback_func исключительно для использования ConsumerThread.name при печати тела сообщения. С таким же успехом это может быть функция вне класса ConsumerThread.

Процессы

В качестве альтернативы вы всегда можете просто запустить один процесс с потребительским кодом для каждой очереди, в которой вы хотите использовать события.

import pika
import sys


def callback_func(channel, method, properties, body):
    print(body)


if __name__ == "__main__":
    credentials = pika.PlainCredentials("guest", "guest")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=sys.argv[1],
                                  credentials=credentials))

    channel = connection.channel()

    result = channel.queue_declare(exclusive=True)

    channel.queue_bind(result.method.queue,
                       exchange="my-exchange",
                       routing_key="*.*.*.*.*")

    channel.basic_consume(callback_func, result.method.queue, no_ack=True)

    channel.start_consuming()

а затем запустите:

$ python single_consume.py host1
$ python single_consume.py host2  # e.g. on another console

Если работа, которую вы выполняете с сообщениями из очередей, нагружает ЦП и количество ядер в вашем ЦП >= количество потребителей, как правило, лучше использовать этот подход, если только ваши очереди не пусты большую часть времени и потребители не будут использовать это время ЦП*.

Асинхронный

Другой альтернативой является использование некоторой асинхронной среды (например, Twisted) и запуск всего в одном потоке.

Вы больше не можете использовать BlockingConnection в асинхронном коде; к счастью, у pika есть адаптер для Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
    def on_connected(self, connection):
        d = connection.channel()
        d.addCallback(self.got_channel)
        d.addCallback(self.queue_declared)
        d.addCallback(self.queue_bound)
        d.addCallback(self.handle_deliveries)
        d.addErrback(log.err)

    def got_channel(self, channel):
        self.channel = channel

        return self.channel.queue_declare(exclusive=True)

    def queue_declared(self, queue):
        self._queue_name = queue.method.queue

        self.channel.queue_bind(queue=self._queue_name,
                                exchange="my-exchange",
                                routing_key="*.*.*.*.*")

    def queue_bound(self, ignored):
        return self.channel.basic_consume(queue=self._queue_name)

    def handle_deliveries(self, queue_and_consumer_tag):
        queue, consumer_tag = queue_and_consumer_tag
        self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

        return self.looping_call.start(0)

    def consume_from_queue(self, queue):
        d = queue.get()

        return d.addCallback(lambda result: self.handle_payload(*result))

    def handle_payload(self, channel, method, properties, body):
        print(body)


if __name__ == "__main__":
    consumer1 = Consumer()
    consumer2 = Consumer()

    parameters = ConnectionParameters()
    cc = protocol.ClientCreator(reactor,
                                TwistedProtocolConnection,
                                parameters)
    d1 = cc.connectTCP("host1", 5672)
    d1.addCallback(lambda protocol: protocol.ready)
    d1.addCallback(consumer1.on_connected)
    d1.addErrback(log.err)

    d2 = cc.connectTCP("host2", 5672)
    d2.addCallback(lambda protocol: protocol.ready)
    d2.addCallback(consumer2.on_connected)
    d2.addErrback(log.err)

    reactor.run()

Этот подход был бы даже лучше, если бы из большего числа очередей вы бы потребляли и тем меньше ЦП выполняла бы работа, выполняемая потребителями*.

Питон 3

Поскольку вы упомянули pika, я ограничился решениями на основе Python 2.x, потому что pika еще не перенесен.

Но если вы захотите перейти на >=3.3, один из возможных вариантов — использовать asyncio< /a> с одним из протоколов AMQP (протокол, по которому вы говорите с RabbitMQ), например. asynqp или aioamqp.

* - обратите внимание, что это очень поверхностные подсказки - в большинстве случаев выбор не так очевиден; что будет лучше для вас, зависит от «насыщенности» очередей (сообщения/время), какую работу вы выполняете при получении этих сообщений, в какой среде вы запускаете своих потребителей и т.д.; нет другого способа убедиться, кроме как сравнить все реализации

person Unit03    schedule 17.02.2015
comment
Привет @ Unit03 Я знаю, что вы ответили в 2015 году. Я использую тот же витой адаптер, и у меня пропадает слишком много сердечных сокращений. Вы можете посетить мой вопрос для получения кода и дополнительного описания. stackoverflow.com/questions/62024116/ - person Vaibhav Mishra; 27.05.2020
comment
Привет, Вайбхав, я ответил на этот вопрос. - person Unit03; 30.05.2020

Ниже приведен пример того, как я использую один экземпляр rabbitmq для одновременного прослушивания двух очередей:

import pika
import threading

threads=[]
def client_info(channel):    
   channel.queue_declare(queue='proxy-python')
   print (' [*] Waiting for client messages. To exit press CTRL+C')


   def callback(ch, method, properties, body):
       print (" Received %s" % (body))

   channel.basic_consume(callback, queue='proxy-python', no_ack=True)
   channel.start_consuming()

def scenario_info(channel):    
   channel.queue_declare(queue='savi-virnet-python')
   print (' [*] Waiting for scenrio messages. To exit press CTRL+C')


   def callback(ch, method, properties, body):
      print (" Received %s" % (body))

   channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True)
   channel.start_consuming()

def manager():
   connection1= pika.BlockingConnection(pika.ConnectionParameters
  (host='localhost'))
   channel1 = connection1.channel()
  connection2= pika.BlockingConnection(pika.ConnectionParameters
  (host='localhost'))
   channel2 = connection2.channel()
   t1 = threading.Thread(target=client_info, args=(channel1,))
   t1.daemon = True
   threads.append(t1)
   t1.start()  

   t2 = threading.Thread(target=scenario_info, args=(channel2,))
   t2.daemon = True
   threads.append(t2)


   t2.start()
   for t in threads:
     t.join()


 manager()
person NasimBM    schedule 04.07.2016

import asyncio
import tornado.ioloop
import tornado.web

from aio_pika import connect_robust, Message

tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)

QUEUE = asyncio.Queue()


class SubscriberHandler(tornado.web.RequestHandler):
    async def get(self):
        message = await QUEUE.get()
        self.finish(message.body)


class PublisherHandler(tornado.web.RequestHandler):
    async def post(self):
        connection = self.application.settings["amqp_connection"]
        channel = await connection.channel()
        try:
            await channel.default_exchange.publish(
                Message(body=self.request.body), routing_key="test",
            )
        finally:
            await channel.close()
            print('ok')
        self.finish("OK")

async def make_app():
    amqp_connection = await connect_robust()
    channel = await amqp_connection.channel()
    queue = await channel.declare_queue("test", auto_delete=True)
    await queue.consume(QUEUE.put, no_ack=True)
    return tornado.web.Application(
        [(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
        amqp_connection=amqp_connection,
    )

if __name__ == "__main__":
    app = io_loop.asyncio_loop.run_until_complete(make_app())
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Вы можете использовать aio-pika в асинхронном режиме. Больше примеров здесь https://buildmedia.readthedocs.org/media/pdf/aio-pika/latest/aio-pika.pdf

Удачного кодирования :)

person Divek John    schedule 15.04.2021