Изменить:
Основная проблема заключается в том, что сторонняя машина rabbitmq, похоже, время от времени убивает простаивающие соединения. Именно тогда я начинаю получать исключения «Broken Pipe». Единственный способ получить связь. вернуться в нормальное состояние для меня, чтобы убить процессы и перезапустить их. Я предполагаю, что есть лучший способ?
--
Я немного потерялся здесь. Я подключаюсь к стороннему серверу RabbitMQ для отправки сообщений. Время от времени все сокеты на их машине сбрасываются, и в итоге я получаю исключение «Broken Pipe».
Мне сказали реализовать проверку сердцебиения в моем коде, но я не уверен, как именно. Я нашел некоторую информацию здесь: http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0, но без реального примера кода.
Мне нужно только добавить «?heartbeat=x» в строку подключения? Комбу делает все остальное? Я вижу, что мне нужно вызвать "Connection.heartbeat_check()" в "x/2". Должен ли я создать периодическую задачу для вызова этого? Как восстановить соединение?
Я использую:
- сельдерей == 3.0.12
- комбу == 2.5.4
Мой код выглядит так прямо сейчас. Вызывается простая задача Celery для отправки сообщения на сторонний сервер RabbitMQ (удалено ведение журнала и комментарии, чтобы сделать его коротким, достаточно простым):
class SendMessageTask(Task):
name = "campaign.backends.send"
routing_key = "campaign.backends.send"
ignore_result = True
default_retry_delay = 60 # 1 minute.
max_retries = 5
def run(self, send_to, message, **kwargs):
payload = "Testing message"
try:
conn = BrokerConnection(
hostname=HOSTNAME,
port=PORT,
userid=USER_ID,
password=PASSWORD,
virtual_host=VHOST
)
with producers[conn].acquire(block=True) as producer:
publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
publish(
body=payload,
routing_key=OUT_ROUTING_KEY,
delivery_mode=2,
exchange=EXCHANGE,
serializer=None,
content_type='text/xml',
content_encoding = 'utf-8'
)
except Exception, ex:
print ex
Спасибо за любую помощь.