Spring Integration JMS с откатом JTA при переходе сообщения в errorChannel

Я использую Spring Integration с поддержкой JTA через Atomikos и JMS, привязанные к разным Webshpere MQ как для входящих, так и для исходящих сообщений. Поток следующий:

  • Адаптер входящего канала JMS получает сообщение
  • некоторые преобразования
  • Адаптер исходящего канала JMS в очередь вывода
  • при возникновении ошибки errorChannel получает сообщение
  • Маршрутизатор типа исключения направляет необработанные ошибки в настраиваемую службу повторного выброса, а обработанные - в список-получателей-маршрутизатор, который отправляет их в 2 очереди ошибок

Моя проблема в том, что я хочу, чтобы транзакция была зафиксирована, даже если сообщение идет вниз по каналу errorChannel (в случае обработанного исключения и в очереди ошибок). Насколько я понимаю, откат должен происходить только в том случае, если Exception повторно генерируется (поэтому я повторно выбрасываю необработанные), но в моем случае откат транзакции происходит, как только сообщение достигает errorChannel (до того, как оно будет перенаправлено в другое место).

Что я делаю неправильно?

Конфигурация следует.

<jms:inbound-channel-adapter id="jms-in"
                             destination="input-queue"
                             connection-factory="inConnectionFactory"
                             channel="edi-inbound"
                             acknowledge="transacted">
    <poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}"
            fixed-rate="${process.jms.inbound.poll.rate-millis:60000}"
            >
        <transactional timeout="${process.tx.timeout-sec:60}"/>
    </poller>
</jms:inbound-channel-adapter>
<channel id="edi-inbound"/>

<chain input-channel="edi-inbound" output-channel="edi-transformation-chain">
    <object-to-string-transformer/>
    <service-activator ref="inbound" method="service"/>
</chain>

<!-- edifact transformation flow -->
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose">
    <transformer ref="edi2xml-converter"/>
    <transformer ref="xml-mapper"/>
</chain>




<chain input-channel="outbound-message-compose" output-channel="outbound-channel">
    <service-activator ref="outbound-message-composer" />
</chain>

<channel id="outbound-channel">
    <interceptors>
        <beans:ref bean="outbound-interceptor" />
    </interceptors>
</channel>

<recipient-list-router input-channel="outbound-channel">
    <recipient channel="file-outbound"/>
    <recipient channel="queue-outbound"/>
</recipient-list-router>



<channel id="queue-outbound"/>
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/>



<channel id="file-outbound"/>
<file:outbound-channel-adapter id="file-outbound"
                                   directory="${output.directory}"
                                   filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'"
                                   delete-source-files="true" />




<!-- notification outbound flow -->
<channel id="errorChannel">
    <interceptors>
        <wire-tap channel="logger"/>
    </interceptors>
</channel>
<logging-channel-adapter id="logger" level="INFO"/>

<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel">
    <mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" />
</exception-type-router>

<recipient-list-router input-channel="error-notification-channel">
    <recipient channel="queue-outbound-error"/>
    <recipient channel="queue-inbound-error"/>
</recipient-list-router>

<chain input-channel="queue-outbound-error">
    <service-activator ref="outbound-error-composer" />
    <jms:outbound-channel-adapter id="jms-out-error"
                                  destination="error-output-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>

<chain input-channel="queue-inbound-error">
    <service-activator ref="error-notif-composer" />
    <jms:outbound-channel-adapter id="jms-in-error"
                                  destination="error-input-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>


<channel id="unhandled-error-channel" />
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/>

Для полноты: когда tx откатывается по каналу ошибок, обе очереди ошибок получают сообщение в любом случае (как если бы исходящие адаптеры не участвовали в транзакции), и tx для нормального потока (когда ошибки не возникает) работает отлично.


person Giordano Biassoni    schedule 28.10.2016    source источник


Ответы (1)


Это правильно.

Потому что вы используете Polling Inbound Channel Adapter. Логика такая:

AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
  ...
                    if (!Poller.this.pollingTask.call()) {
                        break;
                    }
   ...
                catch (Exception e) {
                    if (e instanceof RuntimeException) {
                        throw (RuntimeException) e;
                    }
                    else {
                        throw new MessageHandlingException(new ErrorMessage(e), e);
                    }
                }
            }
        });

Если ваш TX является частью pollingTask прокси, как AOP TransactionInterceptor.

errorChannel является частью ErrorHandler this.taskExecutor. Следовательно, мы можем достичь errorChannel только тогда, когда выбросим исключение из pollingTask. Поскольку у нас там есть TX, его, конечно же, вернули.

Я хочу сказать, что процесс обработки ошибок в Polling Inbound Channel Adapter выполняется за пределами TX.

Подумайте о переходе на <int-jms:message-driven-channel-adapter>.

person Artem Bilan    schedule 28.10.2016
comment
Я планировал в какой-то момент переключиться на адаптер канала, управляемый сообщениями, в основном для производственной среды, но я надеялся, что смогу получить тот же результат с адаптером входящего канала и опросчиком. Как вы думаете, есть ли способ перенести процесс обработки ошибок внутри TX, даже с версией поллера? - person Giordano Biassoni; 28.10.2016
comment
Что ж, на самом деле JMS - это протокол потоковой передачи, поэтому было бы естественно всегда прослушивать адрес назначения. Редкий случай, когда нам нужен адаптер опроса для JMS. - person Artem Bilan; 28.10.2016
comment
Так что, в качестве альтернативы, я могу настроить управляемый сообщениями адаптер для медленного использования сообщений. Я имею в виду, могу ли я определить норму потребления? - person Giordano Biassoni; 28.10.2016
comment
Ну, вы можете поиграть с опцией concurrency не обрабатывать следующее сообщение, пока текущее не закончится, но, эх, я не понимаю, почему нужно снизить скорость чтения сообщений из JMS ... - person Artem Bilan; 28.10.2016
comment
В среде разработки у меня есть заполненная входящая очередь с ограниченным количеством сообщений (и у меня ограниченный контроль над ней). Итак, чтобы проводить тесты без опустошения очереди, мне нужно контролировать потребление. - person Giordano Biassoni; 28.10.2016
comment
Для этой цели вы можете использовать start()/stop() функции адаптера. Просто введите его в тестовую среду как Lifecycle и при необходимости измените его доступность. - person Artem Bilan; 28.10.2016
comment
Хотя теперь я переключился на адаптер, управляемый сообщениями, и он работает как шарм. Спасибо, Артем. - person Giordano Biassoni; 28.10.2016