Существует ли RabbitMQ/pika, эквивалентный опции ZMQ CONFLATE?

Я хочу, чтобы мой pub/sub хранил только последнее доступное сообщение на канале и выгружал все предыдущие, которые не были бы использованы из-за того, что потребитель слишком медленный (или приостановлен). ZMQ имеет параметр CONFLATE, но есть ли у RabbitMQ аналогичный параметр на Python ( пика)?

Издатель обновляется с частотой 50 Гц, в то время как некоторые подписчики не могут обрабатывать так быстро (от 10 до 50 Гц). Я не хочу, чтобы они обрабатывали самое старое сообщение в очереди (что означает обработку уже устаревших данных); он должен обрабатывать только последнее доступное сообщение, когда оно готово к использованию.


person myoan    schedule 01.05.2018    source источник
comment
Можете ли вы предоставить более подробную информацию? У RabbitMQ есть несколько вариантов решения этой проблемы, но я не знаю, какой из них порекомендовать, поскольку в вашем вопросе недостаточно подробностей. Например, насколько медленно «слишком медленно»?   -  person theMayer    schedule 01.05.2018
comment
Издатель обновляет данные с частотой 50 Гц, в то время как некоторые подписчики не могут обрабатывать более 10–50 Гц. Я не хочу, чтобы они обрабатывали самое старое сообщение в очереди (что означает обработку данных, которые уже устарели), они должны обрабатывать только последнее доступное сообщение, когда оно готово к использованию. Однако было бы интересно объяснить все варианты, о которых вы думаете, с их преимуществами и недостатками для последующих читателей.   -  person myoan    schedule 01.05.2018
comment
Похоже, вам нужна структура данных, отличная от очереди. Очередь предназначена для хранения более чем одной вещи. Структура данных, которая содержит только одну вещь, довольно тривиальна.   -  person theMayer    schedule 02.05.2018


Ответы (1)


RabbitMQ фактически поддерживает описанный сценарий «из коробки» с помощью функции максимальной длины очереди. .

Из документации:

Максимальная длина очереди может быть ограничена заданным количеством сообщений. Во всех случаях используется количество готовых сообщений; неподтвержденные сообщения не учитываются при подсчете лимита.

Поведение по умолчанию для RabbitMQ, когда установлена ​​максимальная длина или размер очереди и достигается максимальное значение, заключается в отбрасывании или удалении недоставленных сообщений из начала очереди (т. е. самых старых сообщений в очереди).

person theMayer    schedule 01.05.2018
comment
Это не сработает для вариантов использования, таких как обновление цен. Например. когда цена на продукт X обновляется, публикуется сообщение. Цена обновляется чаще, чем потребители обрабатывают сообщения об обновлении, поэтому следует обрабатывать только самую последнюю цену. т.е. последняя победа является желаемой политикой. плагин дедупликации почти поможет вам в этом, но он применяет политику первого победителя. Kafka поддерживает этот OOTB. - person Charlie Reitzel; 14.04.2020
comment
@CharlieReitzel - такая ситуация не может существовать в правильно спроектированной системе. Конечный потребитель просто отбрасывает все входящие сообщения, когда он занят. В любом случае, это то, что эта функция делает для вас, поэтому я не вижу смысла, который вы пытаетесь сделать. - person theMayer; 14.04.2020
comment
Очередь последнего значения Google. Как я уже сказал, это хорошо известный вариант использования. Для меня совсем не оригинально :--) Поддерживается Kafka, ActiveMQ Artemis и другими. Но не RabbitMQ, насколько я могу судить. - person Charlie Reitzel; 15.04.2020
comment
@CharlieReitzel - похоже, вы не читали мой ответ ... посмотрите мой ответ. - person theMayer; 15.04.2020
comment
Я читаю это. И это прекрасно в качестве последней меры, чтобы уберечь rabbitmq от разрушения под нагрузкой. И я согласен, что этого не должно происходить при нормальной работе. Если / когда это произойдет, вы должны посмотреть на свое масштабирование. Мой комментарий не имеет к этому никакого отношения. Я говорю о другом: есть случаи использования в бизнесе, когда первый выигрывает дедупликацию на основе естественного ключа, определенного разработчиком приложения. Они не связаны с масштабированием rabbitmq или защитой системы очередей. Скорее, он использует систему очередей, чтобы помочь другим элементам более крупной системы избежать работы с уже устаревшими данными. - person Charlie Reitzel; 16.04.2020