Заставить работников мгновенно получать ошибочные сообщения, когда включен режим постоянной доставки

У меня есть RabbitMQ Server, на котором я получаю сообщения, используя Python-Pika. Проблема в том, что если у меня есть режим постоянной доставки включен, и рабочие процессы не могут обработать сообщение. Вместо выпуска сообщения он будет хранить его до тех пор, пока не будет сброшено соединение RabbitMQ.

Есть ли способ убедиться, что сообщение, которое не удалось обработать, будет автоматически получено снова в течение разумного периода времени от доступного исполнителя, включая того же самого?

Это мой текущий код

if success:
    ch.basic_ack(delivery_tag=method.delivery_tag)
else:
    syslog.syslog('Error (Callback) -- Failed to process payload: %s' % body)

Идея состоит в том, что я никогда не хочу терять сообщение, вместо этого я хочу, чтобы оно было повторно опубликовано или, скорее, снова получено, если оно не удалось. Так должно быть всегда, пока сообщение не будет успешно обработано исполнителем. Обычно это происходит, когда один из рабочих не может открыть соединение с HTTP-сервером.


person eandersson    schedule 15.04.2013    source источник


Ответы (1)


Наконец-то я понял, почему это происходит. Я не понимал, что было недостаточно просто подтвердить, что вы закончили с сообщением, но также нужно было отклонить любое сообщение, которое вы не могли обработать с помощью channel.basic_reject. Это может показаться очевидным, но это не поведение по умолчанию для AMQP.

По сути, мы должны выпустить сообщение, используя basic_reject с requeue, установленным на True. Важным фактором здесь является ключевое слово requeue, которое предотвращает отбрасывание сообщения и вместо этого снова ставит его в очередь, чтобы один из наших доступных рабочих мог его обработать.

if success:
    # On Success - Mark message as processed.
    ch.basic_ack(delivery_tag=method.delivery_tag)
else:
    # Else - Mark message as rejected and move it back to the queue.
    ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)

Я нашел действительно полезную информацию в этой статье, и более подробная техническая информация о ключевом слове reject приведена в этот пост в блоге.

person eandersson    schedule 15.04.2013