Для автоматизации серверов мы пытаемся разработать инструмент, который может обрабатывать и выполнять множество задач на разных серверах. Мы отправляем задачу и имя хоста сервера в очередь. Затем очередь используется запросчиком, который передает информацию доступному API. Чтобы достичь этого, мы можем выполнять более одной задачи одновременно, мы используем многопоточность.
Теперь мы застряли с подтверждением сообщения...
Что мы сделали на данный момент:requester.py
потребляет очередь и затем запускает поток, в котором выполняется задача ansible. Затем результат отправляется в другую очередь. Таким образом, каждое новое сообщение создает новую ветку. Задача завершена, тред умирает.
Но теперь наступает трудная часть. Мы должны сделать сообщения постоянными на случай, если наш сервер умрет. Таким образом, каждое сообщение должно быть подтверждено после отправки результата из ansible.
Теперь наша проблема заключается в том, что когда мы пытаемся подтвердить сообщение в самом потоке, больше не выполняется «одновременная» работа, потому что consume
pika ожидает подтверждения. Итак, как мы можем добиться, чтобы consume
потреблял сообщения и не ждал подтверждения? Или как мы можем обойти или улучшить нашу маленькую программу?
requester.py
#!/bin/python
from worker import *
import ansible.inventory
import ansible.runner
import threading
class Requester(Worker):
def __init__(self):
Worker.__init__(self)
self.connection(self.selfhost, self.from_db)
self.receive(self.from_db)
def send(self, result, ch, method):
self.channel.basic_publish(exchange='',
routing_key=self.to_db,
body=result,
properties=pika.BasicProperties(
delivery_mode=2,
))
print "[x] Sent \n" + result
ch.basic_ack(delivery_tag = method.delivery_tag)
def callAnsible(self, cmd, ch, method):
#call ansible api pre 2.0
result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
self.send(result, ch, method)
def callback(self, ch, method, properties, body):
print(" [x] Received by requester %r" % body)
t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
t.start()
worker.py
import pika
import ConfigParser
import json
import os
class Worker(object):
def __init__(self):
#read some config files
def callback(self, ch, method, properties, body):
raise Exception("Call method in subclass")
def receive(self, queue):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback,queue=queue)
self.channel.start_consuming()
def connection(self,server,queue):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=server,
credentials=self.credentials))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue, durable=True)
Мы работаем с Python 2.7 и pika 0.10.0.
И да, мы заметили в FAQ по pika: http://pika.readthedocs.io/en/0.10.0/faq.html
что pika не является потокобезопасным.