Pika/RabbitMQ: правильное использование add_backpressure_callback

Я новичок в использовании RabbitMQ и Pika, поэтому извините, если ответ очевиден...

Мы загружаем некоторые данные и передаем результаты в нашу очередь сообщений rabbitmq. Очередь используется процессом, который записывает данные в elasticsearch.

Данные создаются быстрее, чем они могут быть загружены в эластичный поиск, и, следовательно, очередь растет и почти никогда не сокращается.

Мы используем pika и получаем предупреждение:

UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.

Это продолжается некоторое время, пока Pika просто не вылетает со странным сообщением об ошибке:

NameError: global name 'log' is not defined

Мы используем объект Pika BlockingConnection (http://pika.github.com/connecting.html#blockingconnection).

Мой план исправить это — использовать функцию add_backpressure_callback, чтобы иметь функцию, которая будет вызывать time.sleep(0.5) каждый раз, когда нам нужно применить обратное давление. Однако кажется, что это слишком простое решение и что должен быть более подходящий способ справиться с чем-то подобным.

Я предполагаю, что это обычная ситуация, когда очередь заполняется быстрее, чем она используется. Я ищу пример или даже совет о том, как лучше всего замедлить очередь.

Спасибо!


person chaimp    schedule 08.08.2012    source источник


Ответы (1)


Интересная проблема, и, как вы правильно заметили, это, вероятно, довольно распространено. Я видел еще один связанный вопрос о переполнении стека с некоторыми указателями

Pika: предупреждение о превышении буфера записи

Кроме того, если вы хотите рассмотреть возможность расширения вашего elasticsearch, это, возможно, основное узкое место, которое вы хотите исправить. Быстрый просмотр веб-сайта elasticsearch.org привел к

"Распределенный

Одной из главных особенностей Elastic Search является его распределенный характер. Индексы разбиты на сегменты, каждый из которых имеет 0 или более реплик. Каждый узел данных в кластере содержит один или несколько сегментов и выступает в качестве координатора для делегирования операций нужным сегментам. Ребалансировка и маршрутизация выполняются автоматически и за кулисами. "

(... хотя не уверен, что вставка также распределена и масштабируема)

В конце концов, RabbitMQ не должен бесконечно увеличивать очереди. Также, возможно, вы захотите рассмотреть масштабирование самого RabbitMQ, например, используя такие вещи, как процессы для каждой очереди и т. д. в конфигурации RabbitMQ.

Ваше здоровье!

person Sachin    schedule 17.08.2012
comment
Спасибо за советы. В итоге я исключил RabbitMQ из уравнения, и теперь все работает гладко. Я понял, что если бы у меня было несколько машин, на которых я мог бы масштабировать что угодно, тогда RabbitMQ был бы отличным способом подачи данных, но поскольку я обрабатываю данные на той же машине, на которой выполняется эластичный поиск, это не так. Не имеет смысла иметь брокера сообщений посередине. В этом случае он служит только дополнительной точкой отказа. Однако спасибо за советы, если мы в конечном итоге масштабируем нашу систему. - person chaimp; 17.08.2012