ActiveMQ: как отправить одну и ту же группу сообщений в один потребительский поток

Мы используем ActiveMQ в нашем проекте, у нас есть требование, чтобы группа сообщений, принадлежащая одной группе, использовалась одним потоком потребителя.

Например, у нас есть 3 (M1, M2, M3) сообщения, которые принадлежат человеку A, и 3 (M4, M5, M6) сообщения, которые принадлежат человеку B.

Наше требование: все сообщения, принадлежащие человеку A, должны потребляться потоком-потребителем 1, а все сообщения, принадлежащие человеку B, должны потребляться потоком-потребителем 2.

Основываясь на документации ActiveMQ и насколько мы понимаем, мы использовали концепцию JMXGroupId в ActiveMQ. Например, мы установили JMSXGroupId как 123 для всех сообщений (M1, M2, M3), принадлежащих лицу A, и 234 для всех сообщений (M4, M5, M6), принадлежащих лицу B.

С JMXGroupId в некоторых случаях все сообщения, принадлежащие лицам A и B (M1, M2, M3, M4, M5 и M6), отправляются в один и тот же поток потребителя 1. Но в нашем требовании они должны отправляться в два разных потока.


person Bala    schedule 01.03.2020    source источник
comment
Отвечал ли мой ответ на ваш вопрос? Если да, отметьте его как правильный, чтобы помочь другим пользователям, у которых может возникнуть такой же вопрос в будущем. Если нет, уточните, что не было рассмотрено. Спасибо!   -  person Justin Bertram    schedule 04.03.2020


Ответы (1)


Похоже, эта тема уже обсуждалась на список рассылки пользователей ActiveMQ. В результате этого обсуждения возникла эта проблема, и похоже, что решение этой проблемы включает отправка сообщения со свойством JMSXGroupSeq, значение которого равно -1. Как указано в Jira:

Ключом к перебалансировке теперь, когда групповая нагрузка является частью приоритета потребителя, является периодическое завершение группы с использованием свойства отрицательного порядкового номера. message.setIntProperty("JMSXGroupSeq", -1);

К сожалению, с этой реализацией возникла проблема, которая описана в этой проблеме. . Об этой проблеме сообщалось в конце 2010 года (то есть 10 лет назад), и, похоже, с тех пор ничего не было сделано для ее решения.

Поэтому я вижу для вас три разных варианта:

  1. Исправьте проблему в кодовой базе ActiveMQ самостоятельно. Это одно из больших преимуществ программного обеспечения с открытым исходным кодом — вы можете исправлять ошибки и реализовывать функции, которые вам нужны, не дожидаясь кого-либо еще.
  2. Перейдите на ActiveMQ Artemis, который уже поддерживает эту функцию, как указано в документацию. ActiveMQ Artemis — это брокер следующего поколения от ActiveMQ, который должен стать ActiveMQ 6, когда достигнет достаточного паритета функций. ActiveMQ Artemis поддерживает все те же протоколы, что и ActiveMQ 5.x, поэтому вам не нужно менять ни один из ваших клиентов.
  3. Откажитесь от группировки сообщений и просто используйте селекторы для ваших потребителей. Например, каждое сообщение, принадлежащее лицу 1, может иметь свойство с именем personIndex со значением 1, а потребитель, который должен получать эти сообщения, может использовать селектор, например personIndex = 1, и тогда каждое сообщение, принадлежащее лицу 2, может иметь свойство с именем personIndex со значением 1. значение 2, и потребитель, который должен получать эти сообщения, может использовать селектор, например personIndex = 2.
person Justin Bertram    schedule 01.03.2020
comment
Спасибо за ответ. мы думали о реализации варианта 3 ранее, но проблема, с которой мы столкнулись, заключается в том, что мы можем получать сообщения, связанные с разными людьми, и, следовательно, если мне нужно установить индекс, мне, возможно, придется использовать какое-то огромное число, а также мне нужно создать столько входящих разъемы каждый штраф использует один селекторный фильтр. - person Bala; 02.03.2020
comment
что-то подобное мне нужно создать в mule‹jms:inbound-endpoint queue=syncpoc Connector-ref=Active_MQ doc:name=JMS Priority 1› ‹jms:transaction action=ALWAYS_BEGIN /› ​​‹jms:selector expression=JMSPriority = 1/› ‹/jms:входящая-конечная точка› - person Bala; 02.03.2020
comment
‹jms:inbound-endpoint queue=syncpoc Connector-ref=Active_MQ doc:name=JMS Priority 2› ‹jms:transaction action=ALWAYS_BEGIN /› ​​‹jms:selector expression=JMSPriority = 2/› ‹ /jms:inbound-endpoint›‹jms:inbound-endpoint queue=syncpoc Connector-ref=Active_MQ doc:name=JMS Priority 3› ‹jms:transaction action=ALWAYS_BEGIN /› ​​‹jms:selector expression=JMSPriority = 3 /› ‹/jms:inbound-endpoint›, если вы видите, что количество входящих конечных точек в моем коде будет экспоненциально увеличиваться. - person Bala; 02.03.2020
comment
Я предполагаю, что это исключает вариант 3, поэтому варианты 1 и 2 остаются потенциальными решениями. - person Justin Bertram; 03.03.2020