Я использую Spring Integration с поддержкой JTA через Atomikos и JMS, привязанные к разным Webshpere MQ как для входящих, так и для исходящих сообщений. Поток следующий:
- Адаптер входящего канала JMS получает сообщение
- некоторые преобразования
- Адаптер исходящего канала JMS в очередь вывода
- при возникновении ошибки
errorChannel
получает сообщение - Маршрутизатор типа исключения направляет необработанные ошибки в настраиваемую службу повторного выброса, а обработанные - в список-получателей-маршрутизатор, который отправляет их в 2 очереди ошибок
Моя проблема в том, что я хочу, чтобы транзакция была зафиксирована, даже если сообщение идет вниз по каналу errorChannel (в случае обработанного исключения и в очереди ошибок). Насколько я понимаю, откат должен происходить только в том случае, если Exception повторно генерируется (поэтому я повторно выбрасываю необработанные), но в моем случае откат транзакции происходит, как только сообщение достигает errorChannel (до того, как оно будет перенаправлено в другое место).
Что я делаю неправильно?
Конфигурация следует.
<jms:inbound-channel-adapter id="jms-in"
destination="input-queue"
connection-factory="inConnectionFactory"
channel="edi-inbound"
acknowledge="transacted">
<poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}"
fixed-rate="${process.jms.inbound.poll.rate-millis:60000}"
>
<transactional timeout="${process.tx.timeout-sec:60}"/>
</poller>
</jms:inbound-channel-adapter>
<channel id="edi-inbound"/>
<chain input-channel="edi-inbound" output-channel="edi-transformation-chain">
<object-to-string-transformer/>
<service-activator ref="inbound" method="service"/>
</chain>
<!-- edifact transformation flow -->
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose">
<transformer ref="edi2xml-converter"/>
<transformer ref="xml-mapper"/>
</chain>
<chain input-channel="outbound-message-compose" output-channel="outbound-channel">
<service-activator ref="outbound-message-composer" />
</chain>
<channel id="outbound-channel">
<interceptors>
<beans:ref bean="outbound-interceptor" />
</interceptors>
</channel>
<recipient-list-router input-channel="outbound-channel">
<recipient channel="file-outbound"/>
<recipient channel="queue-outbound"/>
</recipient-list-router>
<channel id="queue-outbound"/>
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/>
<channel id="file-outbound"/>
<file:outbound-channel-adapter id="file-outbound"
directory="${output.directory}"
filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'"
delete-source-files="true" />
<!-- notification outbound flow -->
<channel id="errorChannel">
<interceptors>
<wire-tap channel="logger"/>
</interceptors>
</channel>
<logging-channel-adapter id="logger" level="INFO"/>
<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel">
<mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" />
</exception-type-router>
<recipient-list-router input-channel="error-notification-channel">
<recipient channel="queue-outbound-error"/>
<recipient channel="queue-inbound-error"/>
</recipient-list-router>
<chain input-channel="queue-outbound-error">
<service-activator ref="outbound-error-composer" />
<jms:outbound-channel-adapter id="jms-out-error"
destination="error-output-queue"
connection-factory="outConnectionFactory"
session-transacted="true"/>
</chain>
<chain input-channel="queue-inbound-error">
<service-activator ref="error-notif-composer" />
<jms:outbound-channel-adapter id="jms-in-error"
destination="error-input-queue"
connection-factory="outConnectionFactory"
session-transacted="true"/>
</chain>
<channel id="unhandled-error-channel" />
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/>
Для полноты: когда tx откатывается по каналу ошибок, обе очереди ошибок получают сообщение в любом случае (как если бы исходящие адаптеры не участвовали в транзакции), и tx для нормального потока (когда ошибки не возникает) работает отлично.