Настройване на Rabbit MQ Heartbeat с Kombu

Редактиране:

Основният проблем е, че машината rabbitmq на трета страна изглежда убива неактивните връзки от време на време. Тогава започвам да получавам изключения „Спукана тръба“. Единственият начин да получите комуникации. обратно към нормалното е за мен да убия процесите и да ги рестартирам. Предполагам, че има по-добър начин?

--

Тук съм малко изгубен. Свързвам се със сървър на RabbitMQ на трета страна, към който да изпращам съобщения. От време на време всички гнезда на тяхната машина се изпускат и в крайна сметка получавам изключение „Спукана тръба“.

Казаха ми да внедря проверка на сърдечния ритъм в моя код, но не съм сигурен как точно. Намерих малко информация тук: http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0, но няма истински примерен код.

Трябва ли само да добавя "?heartbeat=x" към низа за връзка? Kombu ли прави останалото? Виждам, че трябва да извикам "Connection.heartbeat_check()" на "x/2". Трябва ли да създам периодична задача, за да извикам това? Как се възстановява връзката?

Аз използвам:

  • целина==3.0.12
  • kombu==2.5.4

Моят код изглежда така в момента. Извиква се проста задача на Celery, за да изпрати съобщението до сървъра RabbitMQ на трета страна (премахнато регистриране и коментари, за да бъде кратко, достатъчно основно):

class SendMessageTask(Task):
    name = "campaign.backends.send"
    routing_key = "campaign.backends.send"
    ignore_result = True
    default_retry_delay = 60 # 1 minute.
    max_retries = 5

    def run(self, send_to, message, **kwargs):
    payload = "Testing message"

    try:
        conn = BrokerConnection(
        hostname=HOSTNAME,
        port=PORT,
        userid=USER_ID,
        password=PASSWORD,
        virtual_host=VHOST
        )

        with producers[conn].acquire(block=True) as producer:
        publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
        publish(
            body=payload,
            routing_key=OUT_ROUTING_KEY,
            delivery_mode=2,
            exchange=EXCHANGE,
            serializer=None,
            content_type='text/xml',
            content_encoding = 'utf-8'
        )

    except Exception, ex:
        print ex

Благодаря за всяка помощ.


person Christopher Penkin    schedule 29.01.2013    source източник


Отговори (1)


Въпреки че със сигурност можете да добавите сърдечна поддръжка към продуцент, това има повече смисъл за потребителските процеси.

Разрешаването на сърдечни удари означава, че трябва редовно да изпращате сърдечни удари, напр. ако сърдечният ритъм е зададен на 1 секунда, тогава трябва да изпращате сърдечен ритъм на всяка секунда или повече или дистанционното ще затвори връзката.

Това означава, че трябва да използвате отделна нишка или да използвате async io, за да изпращате надеждно сърдечни удари навреме, и тъй като връзката не може да бъде споделена между нишките, това ни оставя с async io.

Добрата новина е, че вероятно няма да получите голяма полза от добавянето на сърдечни удари към връзка само за производство.

person asksol    schedule 29.01.2013
comment
Честно :) Кой тогава е най-добрият начин за справяне с изключението Broken Pipe, което получавам от време на време? Има ли начин да проверите връзката, ако не работи, свържете отново? Благодаря отново. - person Christopher Penkin; 29.01.2013
comment
Мисля, че този отговор в друга тема може да ви даде някои съвети? groups.google.com/forum/?fromgroups=#! topic/carrot-users/ - person asksol; 31.01.2013
comment
Това изглежда като това, което ми трябва. Направих промените в кода, да ви уведомя дали ще се погрижи за проблемите ми с връзката :) - person Christopher Penkin; 31.01.2013
comment
Страхотно, изглежда работи. Не съм получавал повече изключения за Broken Pipe, откакто внедрих моя Producer, както е показано в групата на Google. Благодаря @asksol. - person Christopher Penkin; 02.02.2013