rabbitMQ создает двух отдельных потребителей, которые потребляют из двух разных кластеров в одной и той же службе.

я пытаюсь создать двух отдельных потребителей, которые потребляют из двух разных кластеров в одной и той же службе.

я попытался создать новые соединения, но все равно оба обмена создаются только в одном из кластеров.

я что-то здесь упускаю? я использую spring-boot: 2.2.5.RELEASE и spring-rabbit: 2.2.5.RELEASE

Мой конфиг примерно такой.

@Configuration
@AllArgsConstructor
public class RabbitMQConfiguration {

  private final Connection_A_MQProperties connectionAMQProperties;
  private final Connection_B_MQProperties connectionBMQProperties;

  @Primary
  @Bean
  public ConnectionFactory connectionFactoryA(Connection_A_MQProperties connectionAMQProperties) {
    return createConnection(connectionAMQProperties.getBaseProperties());
  }

  @Bean
  public ConnectionFactory connectionFactoryB(Connection_B_MQProperties connectionBMQProperties) {
    return createConnection(connectionBMQProperties.getBaseProperties());
  }

  private ConnectionFactory createConnection(BaseProperties baseProperties){
     CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setHost(baseProperties.getHost());
    factory.setPort(baseProperties.getPort());
    factory.setUsername(baseProperties.getUsername());
    factory.setPassword(baseProperties.getPassword());
    factory.setConnectionTimeout(baseProperties.getConnectionTimeout());
    factory.setRequestedHeartBeat(baseProperties.getRequestedHeartBeat());
    factory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
    factory.setConnectionCacheSize(2);

    return factory;
  }

  @Primary
  @Bean("connection_A_RabbitAdmin")
  public RabbitAdmin connection_A_RabbitAdmin(@Qualifier("connection_A_RabbitTemplate") RabbitTemplate connection_A_RabbitTemplate) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_A_RabbitTemplate);

    rabbitAdmin.setExplicitDeclarationsOnly(true);
    return rabbitAdmin;
  }

  @Bean("connection_B_RabbitAdmin")
  public RabbitAdmin connection_B_RabbitAdmin(@Qualifier("connection_B_RabbitTemplate") RabbitTemplate connection_B_RabbitTemplate) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_B_RabbitTemplate);
    rabbitAdmin.setExplicitDeclarationsOnly(true);
    return rabbitAdmin;
  }

  /**
  * each declarable has been configured with a AbstractDeclarable.setAdminsThatShouldDeclare() method which contains the particular admin bean 
  * for which declarable should be processed. i have checked this is getting filtered out correctly for each of the rabbit admins.
  **/

  @Bean("connection_A_Declarable")
  public Declarables connection_A_Declarable(@Qualifier("connection_A_RabbitAdmin") RabbitAdmin connection_A_RabbitAdmin) {
    return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_A_RabbitAdmin))
      .baseProperties(connectionAMQProperties)
      .queueNames(Collections.singletonList(connectionAMQProperties.getQueue()))
      .build();
  }

  @Bean("connection_B_Declarable")
  public Declarables connection_B_Declarable(@Qualifier("connection_B_RabbitAdmin") RabbitAdmin connection_B_RabbitAdmin){
    return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_B_RabbitAdmin))
      .baseProperties(connectionBMQProperties)
      .queueNames(Collections.singletonList(connectionBMQProperties.getQueue()))
      .build();
  }


  @Primary
  @Bean("connection_A_RabbitTemplate")
  public RabbitTemplate connection_A_RabbitTemplate(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
        return rabbitTemplate(connectionFactoryA,connectionAMQProperties);
  }

  @Bean("connection_B_RabbitTemplate")
  public RabbitTemplate connection_B_RabbitTemplate(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB) {
    return rabbitTemplate(connectionFactoryB,connectionBMQProperties);
  }

  private RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,BaseProperties baseProperties) {
    RabbitTemplate template = new RabbitTemplate();
    template.setConnectionFactory(connectionFactory);
    template.setExchange(baseProperties.getExchange());
    template.setRoutingKey(baseProperties.getQueueName());
    template.setDefaultReceiveQueue(baseProperties.getQueueName());
    return template;
  }

  @Primary
  @Bean(name = "connection_A_ContainerFactory")
  public SimpleRabbitListenerContainerFactory connection_A_ContainerFactory(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
    return simpleRabbitListenerContainerFactory(connectionFactoryA,connectionAMQProperties);
  }

  @Bean(name = "connection_B_ContainerFactory")
  public SimpleRabbitListenerContainerFactory connection_B_ContainerFactory(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB){
    return simpleRabbitListenerContainerFactory(connectionFactoryB,connectionBMQProperties);
  }

  private SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory, BaseProperties baseProperties) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    factory.setConcurrentConsumers(baseProperties.getConcurrentConsumer());
    factory.setMaxConcurrentConsumers(baseProperties.getMaxConcurrentConsumer());
    return factory;
  }
}

и слушатели настроены как (аналогично для второго слушателя)

@RabbitListener(
    queues = "${connection_a.queue}",
    containerFactory = "connectionFactoryA"
  )

НЕКОТОРЫЕ ЖУРНАЛЫ (отредактировано, чтобы скрыть некоторую информацию) (здесь вы можете видеть, что и обмены, и очереди объявляются правильно с соответствующим администратором).

o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: **-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory       : Created new connection: **ConnectionFactory#5cba890e:0/SimpleConnection@513bec8c [delegate=amqp://[email protected].**.**:5672/, localPort= 64992]
o.s.retry.support.RetryTemplate          : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected].**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected].**.**:5672/,1), conn: Proxy@1d6014a7 Shared Rabbit Connection: SimpleConnection@513bec8c [delegate=amqp://[email protected].**.**:5672/, localPort= 64992]
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.connectionA.exchange'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.connectionA.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionA.**'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionA.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished



o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: message-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory       : Created new connection: **ConnectionFactory#dbca149:0/SimpleConnection@59cb10e0 [delegate=amqp://[email protected].**.**:5672/, localPort= 64994]
o.s.retry.support.RetryTemplate          : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected].**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected].**.**:5672/,1), conn: Proxy@3d763ae5 Shared Rabbit Connection: SimpleConnection@59cb10e0 [delegate=amqp://[email protected].**.**:5672/, localPort= 64994]
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.**.connectionB.exchange'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.**.connectionB.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionB.**'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionB.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished


person Nitish Kumar    schedule 15.06.2020    source источник
comment
Конфиг выглядит нормально. Предоставляет ли журнал DEBUG какие-либо подсказки?   -  person Gary Russell    schedule 15.06.2020
comment
я вижу, что объявляемые объекты правильно объявлены в журналах с конкретными соединениями, но кроме этого я не вижу ничего существенного, связанного с созданием очередей. я добавлю журналы здесь.   -  person Nitish Kumar    schedule 16.06.2020
comment
привет, Гэри, я обновил журналы, может быть, это поможет.   -  person Nitish Kumar    schedule 17.06.2020
comment
>here you can see both exchanges and queues are getting declared correctly with respective admin - так в чем тогда проблема?   -  person Gary Russell    schedule 17.06.2020
comment
Судя по логам так и есть, но все обмены создаются только на одном из кроличьих хостов   -  person Nitish Kumar    schedule 17.06.2020
comment
Но вы сказали correctly with respective admin .   -  person Gary Russell    schedule 17.06.2020
comment
да, так что есть два администратора кролика, которые настроены на двух разных хостах кролика, и я ожидаю, что очереди будут созданы на соответствующих хостах так, как они настроены для соответствующих администраторов. из журналов кажется, что очереди правильно настраиваются администраторами, но все же все очереди создаются только на одном из хостов.   -  person Nitish Kumar    schedule 18.06.2020
comment
Это просто не имеет смысла; Извините; предложения противоречат друг другу.   -  person Gary Russell    schedule 18.06.2020
comment
Я попробую с образцом проекта и проверю, происходит ли это и там.   -  person Nitish Kumar    schedule 18.06.2020


Ответы (1)


Это работает, как и ожидалось для меня:

@SpringBootApplication
public class So62382630Application {

    public static void main(String[] args) {
        SpringApplication.run(So62382630Application.class, args);
    }

    @Bean
    @Primary
    ConnectionFactory cf1() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    ConnectionFactory cf2() {
        return new CachingConnectionFactory("10.0.0.21");
    }

    @Bean
    RabbitAdmin admin1() {
        return new RabbitAdmin(cf1());
    }

    @Bean
    RabbitAdmin admin2() {
        return new RabbitAdmin(cf2());
    }

    @Bean
    Queue q1() {
        Queue queue = new Queue("q1");
        queue.setAdminsThatShouldDeclare(admin1());
        return queue;
    }

    @Bean
    Queue q2() {
        Queue queue = new Queue("q2");
        queue.setAdminsThatShouldDeclare(admin2());
        return queue;
    }

    @Bean
    public ApplicationRunner runner(RabbitAdmin admin1, RabbitAdmin admin2) {
        return args -> {
            System.out.println(admin1.getQueueInfo("q1"));
            System.out.println(admin2.getQueueInfo("q2"));
        };
    }

}

Я вижу очереди, объявленные на соответствующих узлах.

person Gary Russell    schedule 18.06.2020
comment
я тоже пробовал это, но все равно обмен/очереди создаются на том же хосте. - person Nitish Kumar; 21.06.2020
comment
Невозможно. У вас должно быть что-то определено неправильно. - person Gary Russell; 21.06.2020