AMQP basic.get одновременные потребители, извлекающие из очереди

При использовании RabbitMQ в качестве брокера сообщений у меня есть сценарий, в котором несколько одновременных потребителей извлекают сообщения из очереди с помощью метода basic.get AMQP и используют явное подтверждение для удаления сообщения из очереди. Предполагая следующую настройку

Q имеет сообщения M1, M2, M3 и имеет подключенных к нему потребителей C1, C2 и C3 (каждый из которых имеет собственное соединение и канал).

  1. Как обрабатывается параллелизм в методе basic.get? Синхронизирован ли вызов метода basic.get для обработки одновременных потребителей, каждый из которых использует свое собственное соединение и канал? C1, C2 и C3 выполняют вызов basic.get для одновременного получения сообщения (предположим, что сервер получает все 3 запроса одновременно).

  2. C1 запрашивает сообщение с помощью basic.get и получает M1. Когда C2 запрашивает сообщение, поскольку он использует другое соединение, он снова получает M1?

  3. Как потребители могут получать сообщения партиями заранее определенного размера?


person java_geek    schedule 21.11.2014    source источник
comment
Не понятно, о чем ваш вопрос.   -  person pinepain    schedule 21.11.2014
comment
Второй пункт НЕТ. Остальные пункты мне непонятны! Что вы имеете в виду под синхронизацией для обработки одновременных потребителей! RMQ — это система FIFO, поэтому вы помещаете 3 сообщения, и C1 получает первое, C2 — секунды и т. д. Предполагая, что вы используете prefetch = 1.   -  person Gabriele Santomaggio    schedule 22.11.2014
comment
Помог ли мой ответ ниже? Если это так, пожалуйста, отметьте это как ответ. Если нет, то как мне уточнить?   -  person theMayer    schedule 08.07.2015


Ответы (3)


Ваши вопросы действительно затрагивают суть теории очередей и процессов, поэтому я отвечу с этой точки зрения (RabbitMQ на самом деле является универсальным брокером сообщений, что касается моих ответов, поскольку это относится к любому брокеру сообщений).

Как обрабатывается параллелизм в методе basic.get? Синхронизирован ли вызов метода basic.get для обработки одновременных потребителей, каждый из которых использует свое собственное соединение и канал? C1, C2 и C3 выполняют вызов basic.get для одновременного получения сообщения (предположим, что сервер получает все 3 запроса одновременно).

Ответ 1: RabbitMQ разработан как надежный брокер сообщений. Он содержит внутренние процессы и элементы управления, чтобы гарантировать, что одно и то же сообщение не будет передано разным потребителям несколько раз. Теперь из-за непрактичности тестирования сценария, который вы описываете, он работает идеально? Кто знает. Вот почему правильно спроектированные приложения, использующие архитектуру на основе сообщений, будут использовать идемпотентные транзакции, так что если одна и та же транзакция обрабатывается несколько раз, результат будет таким же, как если бы транзакция была обработана один раз. Вывод. Разработайте приложение так, чтобы ответ на этот вопрос не имел значения.

C1 запрашивает сообщение с помощью basic.get и получает M1. Когда C2 запрашивает сообщение, поскольку он использует другое соединение, он снова получает M1?

Ответ 2: Нет. С учетом предположений моего предыдущего ответа брокер RabbitMQ не будет возвращать то же самое сообщение после его доставки. В зависимости от настроек канала и очереди сообщение может быть автоматически подтверждено при доставке и никогда не будет доставлено повторно. Другие настройки будут иметь автоматическую повторную очередь сообщения после «смерти» потока/канала обработки или отрицательного подтверждения от вашего потока обработки. Это важная функциональность, так как «ядовитое» сообщение может неоднократно сеять хаос в вашем приложении, если оно может быть доставлено нескольким потребителям. Вывод: вы можете смело полагаться на это предположение при разработке своего приложения.

Как потребители могут получать сообщения партиями заранее определенного размера?

Ответ: они не могут, да и смысла в этом нет. В любой системе массового обслуживания основное предположение состоит в том, что элементы удаляются из очереди одним файлом. Попытки нарушить это предположение приводят к непредсказуемому поведению; кроме того, единичный поток обычно является наиболее эффективным методом обработки. Однако в реальном мире бывают случаи, когда необходим размер пакета > 1. В таких случаях имеет смысл загружать пакет в отдельное сообщение, поэтому для этого может потребоваться отдельный поток обработки, который извлекает сообщения из очереди и группирует их вместе или изначально помещает их в пакеты. Имейте в виду, что если у вас есть несколько потребителей, невозможно гарантировать, что отдельные сообщения будут обработаны по порядку. Вывод. По возможности следует избегать пакетной обработки, но если это нецелесообразно, вы не можете предполагать, что пакеты будут содержать отдельные сообщения в каком-либо определенном порядке.

person theMayer    schedule 23.11.2014

Вы можете прочитать Руководство по RabbitMQ API и введение в Amqp.

Прежде всего, избегайте использования сообщений с использованием basicGet в ваших потребителях. Вместо этого используйте потребительский интерфейс basicConsume. Это позволяет RabbitMq отправлять вам сообщения по мере их поступления в очередь. Все остальное — это талия ресурсов, поскольку все сводится к занятому опросу.

При использовании basicConsume RabbitMq даже будет отправлять вам больше сообщений в фоновом режиме до определенного количества prefetch. Это позволяет обрабатывать несколько сообщений одновременно, а также сводит к минимуму время ожидания обработки следующего сообщения (если какое-либо сообщение доступно).

Параллелизм вообще не проблема, это то, для чего вы используете очередь! При наличии нескольких потребителей в одной очереди сообщение всегда будет доставлено только одному потребителю (пока сообщение подтверждено ACK). В противном случае вам нужны частные очереди для каждого потребителя и соответствующим образом маршрутизируйте свои сообщения.

Кстати, если вы можете поделиться соединением со своими потребителями, вы должны это сделать. Просто убедитесь, что вы используете один канал на поток.

person Moritz    schedule 22.11.2014
comment
Существуют законные причины для использования метода извлечения вместо метода отправки для получения сообщений. - person theMayer; 23.11.2014
comment
Конечно, есть веские причины для метода вытягивания! Я бы просто не считал это типичным вариантом использования. - person Moritz; 23.11.2014
comment
На самом деле это вопрос семантики программы, так как результирующее поведение приложения остается прежним. - person theMayer; 23.11.2014
comment
Каковы законные причины для использования метода вытягивания по сравнению с методом проталкивания? Все говорят, что это неправильно, и никто не говорит, когда это правильно. - person Eugene To; 21.12.2015
comment
С точки зрения архитектуры, по крайней мере, в текущей версии RabbitMQ метод pull проще реализовать. Это также менее сложно, требуется меньше настроек. Когда потребитель готов к сообщению, он получает сообщение. Это также соответствует современной теории производственных систем, которая делает упор на вытягивание через канбан. - person theMayer; 27.03.2017

Для этого сценария не требуется специальной настройки. Каждый клиент будет атомарно извлекать и получать одно сообщение из очереди, как вы и хотели.

person Fitzsimmons    schedule 21.11.2014
comment
Итак, выполняет ли RabbitMQ часть синхронизации? Реализует ли он некоторую блокировку в запросе basic.get? - person java_geek; 22.11.2014
comment
Перефразируя мой вопрос: как обрабатывается синхронизация на basic.get? Есть ли какая-то блокировка в запросе basic.get? - person java_geek; 22.11.2014