Я новичок в использовании RabbitMQ и Pika, поэтому извините, если ответ очевиден...
Мы загружаем некоторые данные и передаем результаты в нашу очередь сообщений rabbitmq. Очередь используется процессом, который записывает данные в elasticsearch.
Данные создаются быстрее, чем они могут быть загружены в эластичный поиск, и, следовательно, очередь растет и почти никогда не сокращается.
Мы используем pika и получаем предупреждение:
UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.
Это продолжается некоторое время, пока Pika просто не вылетает со странным сообщением об ошибке:
NameError: global name 'log' is not defined
Мы используем объект Pika BlockingConnection (http://pika.github.com/connecting.html#blockingconnection).
Мой план исправить это — использовать функцию add_backpressure_callback, чтобы иметь функцию, которая будет вызывать time.sleep(0.5)
каждый раз, когда нам нужно применить обратное давление. Однако кажется, что это слишком простое решение и что должен быть более подходящий способ справиться с чем-то подобным.
Я предполагаю, что это обычная ситуация, когда очередь заполняется быстрее, чем она используется. Я ищу пример или даже совет о том, как лучше всего замедлить очередь.
Спасибо!