Поведение исходящего адаптера Spring AMQP при отсутствии обмена

У меня есть код интеграции Spring, который подключает слушателя JMS (который слушает локальную очередь серии mq)

    <int-jms:message-driven-channel-adapter>

и пересылает сообщение rabbitmq (не контролируемому мной) через

    <int-amqp:outbound-channel-adapter>

Я пытаюсь сохранить это транзакционное, что означает, что если rabbitmq не получил сообщение, я хочу сохранить его в локальной очереди серии MQ. Однако я заметил, что если обмен, упомянутый в конфигурациях rabbitmq, не существует, я вижу в своих журналах эту строку:

ERROR (CachingConnectionFactory.java:292)     - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'xxx', class-id=60, method-id=40)

Но мое сообщение исчезает для локальной очереди серии mq.

Что мне сделать, чтобы он повторил попытку отправки сообщения, если брокер rabbitmq не работает или обмен не существует?

Спасибо за помощь, Моя конфигурация:

   <int-jms:message-driven-channel-adapter
        id="x.y.z" channel="channel-in"
        error-channel="errorChannel" header-mapper="jmsIntegrationHeaderMapper"
        acknowledge="transacted" destination-name="a.b.c" />

    <int:channel id="channel-in">

    </int:channel>

    <int:header-enricher input-channel="channel-in"
        output-channel="channel-out">
        <int:header name="url"
            expression="'amqp://${amqp.user}@${amqp.host}:${amqp.port}/${amqp.vhost}'"></int:header>
    </int:header-enricher>


    <int:channel id="channel-out">
    </int:channel>

    <int-amqp:outbound-channel-adapter
        channel="channel-out" amqp-template="amqpTemplate"
        routing-key="crd" mapped-request-headers="*" exchange-name="${amqp.exchange}">
    </int-amqp:outbound-channel-adapter>


    <rabbit:connection-factory id="amqpConnectionFactory" addresses="${amqp.host}:${amqp.port}"
                           cache-mode="CONNECTION"
                           channel-cache-size="25"
                           username="${amqp.user}"
                           password="${amqp.pass}"
                           virtual-host="${amqp.vhost}"/>

    <rabbit:template id="amqpTemplate"
                 connection-factory="amqpConnectionFactory" mandatory="true" channel-transacted="true"/>

person mortada    schedule 07.08.2019    source источник
comment
Какую версию Spring Integration вы используете?   -  person Gary Russell    schedule 07.08.2019
comment
Я использую весеннюю интеграцию 4.   -  person mortada    schedule 07.08.2019
comment
Вам нужно установить acknowledge=transcted, чтобы исключение откатило транзакцию; мы изменили значение по умолчанию на то, что было в 4.2.   -  person Gary Russell    schedule 07.08.2019
comment
Он уже был установлен <int-jms:message-driven-channel-adapter id="adapter.jms.xxxx" channel="channel.direct.incoming-xxxx" error-channel="errorChannel" header-mapper="jmsIntegrationHeaderMapper" acknowledge="transacted" destination-name="xxxx" /> Странно то, что нет исключения только строка ошибки, о которой я упоминал в сообщении.   -  person mortada    schedule 08.08.2019
comment
Я использую CachingConnectionFactor public void shutdownCompleted(ShutdownSignalException cause) { if (RabbitUtils.isPassiveDeclarationChannelClose(cause)) { if (logger.isDebugEnabled()) { logger.debug("Channel shutdown: " + cause.getMessage()); } } else if (!RabbitUtils.isNormalChannelClose(cause)) { logger.error("Channel shutdown: " + cause.getMessage()); } } код только регистрирует ошибку, но не вызывает исключение, может быть, поэтому?   -  person mortada    schedule 08.08.2019


Ответы (1)


В отличие от JMS, публикация в RabbitMQ является асинхронной (поэтому обычно она выполняется быстрее), поэтому об ошибке из-за отсутствия обмена сообщается в другом потоке; вы можете добавить слушателя, чтобы получать уведомление, вместо того, чтобы просто регистрироваться.

Вы можете включить транзакции template.setChannelTransacted(true);, и тогда об ошибке будет сообщено в потоке публикации, потому что он будет заблокирован в ожидании ответа от txCommit() при возникновении ошибки ввода-вывода.

Однако транзакции довольно дороги и могут снизить производительность.

Для других ошибок (например, существует обмен, но нет маршрута к очереди), вы можете использовать подтверждения издателя и возврат, но, опять же, ожидание подтверждения для каждого отдельного опубликованного сообщения снизит производительность.

ИЗМЕНИТЬ

Вы должны получить исключение на txCommit.

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:107) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:2003) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:865) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1841) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1784) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:864) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:927) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:921) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at com.example.So47454769Application.lambda$0(So47454769Application.java:29) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:780) [spring-boot-2.0.0.M7.jar:2.0.0.M7]
    ... 5 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1519) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:52) ~[amqp-client-5.0.0.jar:5.0.0]
    at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.txCommit(PublisherCallbackChannelImpl.java:588) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:981) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at com.sun.proxy.$Proxy60.txCommit(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:104) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    ... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'junk' in vhost '/', class-id=60, method-id=40)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.0.0.jar:5.0.0]
    ... 24 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'junk' in vhost '/', class-id=60, method-id=40)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:504) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) ~[amqp-client-5.0.0.jar:5.0.0]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]

person Gary Russell    schedule 08.08.2019
comment
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" channel-transacted="true" mandatory="true" /> Я уже использую свойство channel-transhibited, но ошибки не возникает. - person mortada; 08.08.2019
comment
Возможно, вы не используете этот шаблон в исходящем адаптере - я добавил трассировку стека, которую вы должны увидеть. Отредактируйте вопрос, чтобы отобразить полную конфигурацию. - person Gary Russell; 08.08.2019
comment
Только что опубликовал свою конфигурацию, странно то, что у меня вообще нет этого исключения. - person mortada; 09.08.2019
comment
Хорошо, я попытался поместить имя обмена в <rabbit:template id="amqpTemplate" И теперь у меня есть исключение, и он откатился в очередь JMS, за исключением того, что вы упомянули, это здорово. Однако, если мне нужно повторить попытку автоматически, что мне следует использовать? Спасибо, - person mortada; 09.08.2019
comment
Непонятно, что вы имеете в виду; когда сообщение откатывается, оно будет повторно доставлено JMS (возможно, за любыми предварительно загруженными сообщениями). - person Gary Russell; 09.08.2019