Привязать потребителя RabbitMQ с помощью Spring Cloud Stream к существующей очереди

Я создал с помощью веб-интерфейса RabbitMQ обмен темами TX и привязал к обмену две очереди TX.Q1 и TX.Q2, каждый из них был связан с ключами маршрутизации rk1 и rk2 соответственно, и отправил на обмен несколько сообщений.

Теперь я хочу создать потребителя, использующего Spring Cloud Stream, который будет принимать сообщения только из первого квартала. Я пробовал использовать конфигурацию:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1

и аннотацию @StreamListner(Sink.INPUT) для метода, потребляющего сообщения.

В результате я вижу, что потребитель создал очередь (или привязку) с тем же именем TX.Q1, но ключ маршрутизации новой очереди / привязки - #.
Как я могу настроить через Spring Cloud Stream потребителя, который будет получать сообщения из заранее определенной очереди (только те, которые маршрутизируются с помощью rk1).


person yuval simhon    schedule 13.12.2016    source источник


Ответы (4)


Так что на данный момент обходной путь, который предложил Гарри Рассел, решил проблему для меня.

Я использовал @RabbitListener вместо @StreamListenet следующим образом:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1").

В результате предопределенная очередь TX.Q1 привязывается с ключом привязки: rk1 к обмену TX.

Ожидаем обновлений по проблеме Spring Cloud Steream .

person yuval simhon    schedule 18.12.2016
comment
Прошло некоторое время с момента этого обходного пути, мне интересно, вышли ли вы с более чистым решением (например, с использованием свойств вместо аннотаций). - person iakko; 10.01.2019

Думаю, я нашел решение с помощью @StreamListener, а не с помощью обходного пути. Все делается в конфигурации, а не в коде.

Я использовал следующую конфигурацию (она в .yml, но вы можете легко перевести ее в .properties):

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: <binder_name>
          destination: TX
          group: Q1
      binders:
        <binder_name>:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: <port>
                virtual-host: <vhost>
                username: <username>
                password: <password>
      rabbit:
        bindings:
          input:
            consumer:
              binding-routing-key: rk1
              exchange-name: TX
              queue-name-group-only: true
              bind-queue: true
              exchange-durable: true
              exchange-type: topic

Используя этот подход, вам не нужно писать конкретный код, чтобы позволить потребителю RabbitMQ подключаться к вашему кластеру, это должно решить ваш случай.

Надеюсь это поможет.

person iakko    schedule 10.01.2019
comment
это работает не так, как ожидалось. Например, я точно указывал вышеуказанные настройки, но каждый раз, когда я перезапускаю свое приложение, я замечаю, что в RabbitMQ создается новая привязка с ключом маршрутизации #, хотя я уже определил другой ключ маршрутизации. Итак, всякий раз, когда я отправляю сообщение даже без ключа маршрутизации - я не ожидаю, что мой слушатель получит это сообщение - однако, поскольку новая привязка была создана автоматически (#), как указано в исходном вопросе, я не могу отправить сообщение с конкретный ключ маршрутизации. - person Neeraj Singh; 04.03.2019

Spring Cloud Stream устанавливает внутренним ключом маршрутизатора для конечной точки потребителя либо само имя пункта назначения (имя exchange), либо маршрутизацию на основе заголовка partition в случае статического разделения.

Я думаю, что эта проблема с github может быть актуальной к вашему случаю.

person Ilayaperumal Gopinathan    schedule 13.12.2016
comment
Итак, вы имеете в виду, что с облачным потоком Spring я не могу привязать потребителя к определенной предопределенной (не анонимной) очереди, у которой есть ключ маршрутизации? - person yuval simhon; 13.12.2016
comment
Я только что протестировал, и мы добавляем вторую привязку; Я думаю, что это ошибка - если очередь уже существует, мы не должны добавлять универсальную (# привязку с подстановочными знаками). В качестве обходного пути вы можете использовать @RabbitListener вместо @StreamListener (если вы не полагаетесь на прослушиватель потока для преобразования). Я открыл и выпуск для этого. - person Gary Russell; 13.12.2016

Рекомендуется использовать это свойство для потребителя, чтобы кролик мог потреблять из существующей очереди. Обратите внимание, что имя очереди будет выбрано только из свойства группы, а не из пункта назначения.

queueNameGroupOnly: true

Пример:

cloud:
stream:
  # rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
  rabbit:
    bindings:
      input:
        consumer:
          acknowledgeMode: AUTO
          bindingRoutingKey: DECISION_PERSISTENCE_KEY
          declareExchange: false
          bindQueue: false
          queueNameGroupOnly: true
          consumerTagPrefix: dpa-rabbit-consumer
  bindings:
    input:
      binder: rabbit
      group: DECISION_PERSISTENCE_QUEUE
      content-type: application/json
person Shivankur Pal    schedule 21.02.2019