Pika: использовать следующее сообщение, даже если последнее сообщение не было подтверждено

Для автоматизации серверов мы пытаемся разработать инструмент, который может обрабатывать и выполнять множество задач на разных серверах. Мы отправляем задачу и имя хоста сервера в очередь. Затем очередь используется запросчиком, который передает информацию доступному API. Чтобы достичь этого, мы можем выполнять более одной задачи одновременно, мы используем многопоточность.

Теперь мы застряли с подтверждением сообщения...

Что мы сделали на данный момент:
requester.py потребляет очередь и затем запускает поток, в котором выполняется задача ansible. Затем результат отправляется в другую очередь. Таким образом, каждое новое сообщение создает новую ветку. Задача завершена, тред умирает.
Но теперь наступает трудная часть. Мы должны сделать сообщения постоянными на случай, если наш сервер умрет. Таким образом, каждое сообщение должно быть подтверждено после отправки результата из ansible.

Теперь наша проблема заключается в том, что когда мы пытаемся подтвердить сообщение в самом потоке, больше не выполняется «одновременная» работа, потому что consume pika ожидает подтверждения. Итак, как мы можем добиться, чтобы consume потреблял сообщения и не ждал подтверждения? Или как мы можем обойти или улучшить нашу маленькую программу?

requester.py

  #!/bin/python

  from worker import *
  import ansible.inventory
  import ansible.runner
  import threading

  class Requester(Worker):
      def __init__(self):
          Worker.__init__(self)
          self.connection(self.selfhost, self.from_db)
          self.receive(self.from_db)

      def send(self, result, ch, method):
          self.channel.basic_publish(exchange='',
                                     routing_key=self.to_db,
                                     body=result,
                                     properties=pika.BasicProperties(
                                                     delivery_mode=2,
                                     ))

          print "[x] Sent \n" + result
          ch.basic_ack(delivery_tag = method.delivery_tag)

      def callAnsible(self, cmd, ch, method):
          #call ansible api pre 2.0

          result =  json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
          self.send(result, ch, method)

      def callback(self, ch, method, properties, body):
          print(" [x] Received by requester %r" % body)
          t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
          t.start()

worker.py

  import pika
  import ConfigParser
  import json
  import os

  class Worker(object):
      def __init__(self):
          #read some config files

      def callback(self, ch, method, properties, body):
          raise Exception("Call method in subclass")

      def receive(self, queue):
          self.channel.basic_qos(prefetch_count=1)
          self.channel.basic_consume(self.callback,queue=queue)
          self.channel.start_consuming()

      def connection(self,server,queue):
          self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                 host=server,
                 credentials=self.credentials))
          self.channel = self.connection.channel()
          self.channel.queue_declare(queue=queue, durable=True)

Мы работаем с Python 2.7 и pika 0.10.0.

И да, мы заметили в FAQ по pika: http://pika.readthedocs.io/en/0.10.0/faq.html
что pika не является потокобезопасным.


person Rumpli    schedule 16.06.2016    source источник


Ответы (1)


Отключите автоматическое подтверждение и установите счетчик предварительной выборки больше 1, в зависимости от того, сколько сообщений вы хотите, чтобы ваш потребитель принял.

Вот как настроить предварительную выборку channel.basic_qos(prefetch_count=1), см. здесь.

person cantSleepNow    schedule 16.06.2016
comment
Потрясающий! Благодарю вас! Как я могу разогнать этот счетчик предварительной выборки. Это делает всю магию. - person Rumpli; 16.06.2016
comment
@Rumpli Я добавил это к ответу. Теперь я собираюсь пойти на свой собственный ущерб, но, поскольку вы здесь новичок, я кратко объясню о голосовании и принятии ответов: если ответ поможет вам, проголосуйте за него. Если это решит вашу проблему, проголосуйте за нее и примите. Здесь вы только приняли без голосования, но вы не пробовали, если это работает :) Может быть, просто проголосуйте сейчас, и как только вы подтвердите, примите также. Кто-нибудь, пожалуйста, поправьте меня, если я неправильно объяснил это голосование/принятие. - person cantSleepNow; 16.06.2016
comment
Спасибо за ваше объяснение. Я попробовал это, и, установив для «channel.basic_qos (prefetch_count = 1)» значение более «1», он одновременно выполняет более одной задачи. И я попытался проголосовать за ваш ответ, но пока у меня нет репутации 15, он не отобразит его... :( - person Rumpli; 17.06.2016