я пытаюсь создать двух отдельных потребителей, которые потребляют из двух разных кластеров в одной и той же службе.
я попытался создать новые соединения, но все равно оба обмена создаются только в одном из кластеров.
я что-то здесь упускаю? я использую spring-boot: 2.2.5.RELEASE и spring-rabbit: 2.2.5.RELEASE
Мой конфиг примерно такой.
@Configuration
@AllArgsConstructor
public class RabbitMQConfiguration {
private final Connection_A_MQProperties connectionAMQProperties;
private final Connection_B_MQProperties connectionBMQProperties;
@Primary
@Bean
public ConnectionFactory connectionFactoryA(Connection_A_MQProperties connectionAMQProperties) {
return createConnection(connectionAMQProperties.getBaseProperties());
}
@Bean
public ConnectionFactory connectionFactoryB(Connection_B_MQProperties connectionBMQProperties) {
return createConnection(connectionBMQProperties.getBaseProperties());
}
private ConnectionFactory createConnection(BaseProperties baseProperties){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(baseProperties.getHost());
factory.setPort(baseProperties.getPort());
factory.setUsername(baseProperties.getUsername());
factory.setPassword(baseProperties.getPassword());
factory.setConnectionTimeout(baseProperties.getConnectionTimeout());
factory.setRequestedHeartBeat(baseProperties.getRequestedHeartBeat());
factory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
factory.setConnectionCacheSize(2);
return factory;
}
@Primary
@Bean("connection_A_RabbitAdmin")
public RabbitAdmin connection_A_RabbitAdmin(@Qualifier("connection_A_RabbitTemplate") RabbitTemplate connection_A_RabbitTemplate) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_A_RabbitTemplate);
rabbitAdmin.setExplicitDeclarationsOnly(true);
return rabbitAdmin;
}
@Bean("connection_B_RabbitAdmin")
public RabbitAdmin connection_B_RabbitAdmin(@Qualifier("connection_B_RabbitTemplate") RabbitTemplate connection_B_RabbitTemplate) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_B_RabbitTemplate);
rabbitAdmin.setExplicitDeclarationsOnly(true);
return rabbitAdmin;
}
/**
* each declarable has been configured with a AbstractDeclarable.setAdminsThatShouldDeclare() method which contains the particular admin bean
* for which declarable should be processed. i have checked this is getting filtered out correctly for each of the rabbit admins.
**/
@Bean("connection_A_Declarable")
public Declarables connection_A_Declarable(@Qualifier("connection_A_RabbitAdmin") RabbitAdmin connection_A_RabbitAdmin) {
return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_A_RabbitAdmin))
.baseProperties(connectionAMQProperties)
.queueNames(Collections.singletonList(connectionAMQProperties.getQueue()))
.build();
}
@Bean("connection_B_Declarable")
public Declarables connection_B_Declarable(@Qualifier("connection_B_RabbitAdmin") RabbitAdmin connection_B_RabbitAdmin){
return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_B_RabbitAdmin))
.baseProperties(connectionBMQProperties)
.queueNames(Collections.singletonList(connectionBMQProperties.getQueue()))
.build();
}
@Primary
@Bean("connection_A_RabbitTemplate")
public RabbitTemplate connection_A_RabbitTemplate(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
return rabbitTemplate(connectionFactoryA,connectionAMQProperties);
}
@Bean("connection_B_RabbitTemplate")
public RabbitTemplate connection_B_RabbitTemplate(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB) {
return rabbitTemplate(connectionFactoryB,connectionBMQProperties);
}
private RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,BaseProperties baseProperties) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setExchange(baseProperties.getExchange());
template.setRoutingKey(baseProperties.getQueueName());
template.setDefaultReceiveQueue(baseProperties.getQueueName());
return template;
}
@Primary
@Bean(name = "connection_A_ContainerFactory")
public SimpleRabbitListenerContainerFactory connection_A_ContainerFactory(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
return simpleRabbitListenerContainerFactory(connectionFactoryA,connectionAMQProperties);
}
@Bean(name = "connection_B_ContainerFactory")
public SimpleRabbitListenerContainerFactory connection_B_ContainerFactory(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB){
return simpleRabbitListenerContainerFactory(connectionFactoryB,connectionBMQProperties);
}
private SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory, BaseProperties baseProperties) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(baseProperties.getConcurrentConsumer());
factory.setMaxConcurrentConsumers(baseProperties.getMaxConcurrentConsumer());
return factory;
}
}
и слушатели настроены как (аналогично для второго слушателя)
@RabbitListener(
queues = "${connection_a.queue}",
containerFactory = "connectionFactoryA"
)
НЕКОТОРЫЕ ЖУРНАЛЫ (отредактировано, чтобы скрыть некоторую информацию) (здесь вы можете видеть, что и обмены, и очереди объявляются правильно с соответствующим администратором).
o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: **-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory : Created new connection: **ConnectionFactory#5cba890e:0/SimpleConnection@513bec8c [delegate=amqp://[email protected].**.**:5672/, localPort= 64992]
o.s.retry.support.RetryTemplate : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected].**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected].**.**:5672/,1), conn: Proxy@1d6014a7 Shared Rabbit Connection: SimpleConnection@513bec8c [delegate=amqp://[email protected].**.**:5672/, localPort= 64992]
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.connectionA.exchange'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.connectionA.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionA.**'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionA.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished
o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: message-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory : Created new connection: **ConnectionFactory#dbca149:0/SimpleConnection@59cb10e0 [delegate=amqp://[email protected].**.**:5672/, localPort= 64994]
o.s.retry.support.RetryTemplate : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected].**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected].**.**:5672/,1), conn: Proxy@3d763ae5 Shared Rabbit Connection: SimpleConnection@59cb10e0 [delegate=amqp://[email protected].**.**:5672/, localPort= 64994]
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.**.connectionB.exchange'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.**.connectionB.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionB.**'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionB.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished
>here you can see both exchanges and queues are getting declared correctly with respective admin
- так в чем тогда проблема? - person Gary Russell   schedule 17.06.2020correctly with respective admin
. - person Gary Russell   schedule 17.06.2020