Настройка Rabbit MQ Heartbeat с Kombu

Изменить:

Основная проблема заключается в том, что сторонняя машина rabbitmq, похоже, время от времени убивает простаивающие соединения. Именно тогда я начинаю получать исключения «Broken Pipe». Единственный способ получить связь. вернуться в нормальное состояние для меня, чтобы убить процессы и перезапустить их. Я предполагаю, что есть лучший способ?

--

Я немного потерялся здесь. Я подключаюсь к стороннему серверу RabbitMQ для отправки сообщений. Время от времени все сокеты на их машине сбрасываются, и в итоге я получаю исключение «Broken Pipe».

Мне сказали реализовать проверку сердцебиения в моем коде, но я не уверен, как именно. Я нашел некоторую информацию здесь: http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0, но без реального примера кода.

Мне нужно только добавить «?heartbeat=x» в строку подключения? Комбу делает все остальное? Я вижу, что мне нужно вызвать "Connection.heartbeat_check()" в "x/2". Должен ли я создать периодическую задачу для вызова этого? Как восстановить соединение?

Я использую:

  • сельдерей == 3.0.12
  • комбу == 2.5.4

Мой код выглядит так прямо сейчас. Вызывается простая задача Celery для отправки сообщения на сторонний сервер RabbitMQ (удалено ведение журнала и комментарии, чтобы сделать его коротким, достаточно простым):

class SendMessageTask(Task):
    name = "campaign.backends.send"
    routing_key = "campaign.backends.send"
    ignore_result = True
    default_retry_delay = 60 # 1 minute.
    max_retries = 5

    def run(self, send_to, message, **kwargs):
    payload = "Testing message"

    try:
        conn = BrokerConnection(
        hostname=HOSTNAME,
        port=PORT,
        userid=USER_ID,
        password=PASSWORD,
        virtual_host=VHOST
        )

        with producers[conn].acquire(block=True) as producer:
        publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
        publish(
            body=payload,
            routing_key=OUT_ROUTING_KEY,
            delivery_mode=2,
            exchange=EXCHANGE,
            serializer=None,
            content_type='text/xml',
            content_encoding = 'utf-8'
        )

    except Exception, ex:
        print ex

Спасибо за любую помощь.


person Christopher Penkin    schedule 29.01.2013    source источник


Ответы (1)


Хотя вы, безусловно, можете добавить поддержку пульса для производителя, это имеет больше смысла для процессов-потребителей.

Включение пульсаций означает, что вы должны регулярно отправлять пульсации, например. если пульс установлен на 1 секунду, то вы должны отправлять пульс каждую секунду или чаще, иначе пульт закроет соединение.

Это означает, что вам нужно использовать отдельный поток или использовать асинхронный ввод-вывод для надежной своевременной отправки тактов, а поскольку соединение нельзя разделить между потоками, остается асинхронный ввод-вывод.

Хорошей новостью является то, что вы, вероятно, не получите большой выгоды, добавляя сердцебиения к соединению, предназначенному только для производства.

person asksol    schedule 29.01.2013
comment
Достаточно справедливо :) Каков наилучший способ обработки исключения Broken Pipe, которое я получаю время от времени? Есть ли способ проверить соединение, если оно не работает, переподключиться? Спасибо еще раз. - person Christopher Penkin; 29.01.2013
comment
Я думаю, что этот ответ в другой теме может дать вам некоторые подсказки? groups.google.com/forum/?fromgroups=#! тема/пользователи-морковь/ - person asksol; 31.01.2013
comment
Это похоже на то, что мне нужно. Внес изменения в код, дайте знать, если он позаботится о моих проблемах с подключением :) - person Christopher Penkin; 31.01.2013
comment
Круто, вроде работает. Больше не было исключений Broken Pipe с момента реализации моего Producer, как показано в группе Google. Спасибо @asksol. - person Christopher Penkin; 02.02.2013