Я пытаюсь создать общий процесс обработки ошибок для всех обработчиков сообщений, которые будут использоваться в моем потоке SI. Это будет: 1. Повторить попытку при исключении соединения. 2. Остановите поток СИ с помощью прерывателя цепи. 3. Откатить неудачное сообщение в канал.
Я реализовал функции повторной попытки и разрыва цепи. Но я не могу откатить сообщение на канал. Я пытался использовать рекомендации по транзакциям. Но это не работает.
Вот код.
<bean id="retryAdvice"
class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="retryTemplate">
<bean class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="2000" />
<property name="multiplier" value="2" />
</bean>
</property>
</bean>
</property>
<property name="recoveryCallback">
<bean
class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="recoveryChannel" />
</bean>
</property>
<property name="retryStateGenerator">
<bean
class="org.springframework.integration.handler.advice.SpelExpressionRetryStateGenerator">
<constructor-arg value="headers['uniqueId']" />
</bean>
</property>
</bean>
<int:channel id="recoveryChannel" />
<int:transformer id="defaultTransformer" input-channel="recoveryChannel"
output-channel="loggerChannel" ref="defaultTransformer" method="transform">
</int:transformer>
<int:logging-channel-adapter id="loggerChannel"
level="INFO" log-full-message="true" auto-startup="true">
</int:logging-channel-adapter>
<bean id="defaultTransformer"
class="com.bestbuy.ingestion.foundation.core.util.DefaultTransformer" />
<bean id="circuitBreakerAdvice"
class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" /> <!-- close after 2 failures -->
<property name="halfOpenAfter" value="60000" /> <!-- half open after 15 seconds -->
</bean>
<tx:advice id="txansactionAdvice" transaction-manager="transactionManager">
</tx:advice>
Какой тип диспетчера транзакций мне нужно использовать. Я могу использовать разные обработчики сообщений в разных источниках данных.
Вот как я добавляю эти советы в обработчики сообщений.
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
logger.error("called for bean id :: "+beanName+" with bean class "+bean.getClass().getName());
if(bean instanceof AbstractSimpleMessageHandlerFactoryBean){
logger.error("************ Bean "+beanName+" is instance of AbstractSimpleMessageHandlerFactoryBean **********");
}
if(bean instanceof ConsumerEndpointFactoryBean){
logger.error("Bean is of type ConsumerEndpointFactoryBean");
return populateRequestHandlerAdviceChain((ConsumerEndpointFactoryBean)bean);
}
if(bean instanceof AbstractSimpleMessageHandlerFactoryBean){
logger.error("Bean is of type AbstractSimpleMessageHandlerFactoryBean");
return populateRequestHandlerAdviceChain((AbstractSimpleMessageHandlerFactoryBean<?>)bean);
}
return bean;
}
private Object populateRequestHandlerAdviceChain(ConsumerEndpointFactoryBean bean){
ArrayList<Advice> list = new ArrayList<Advice>();
logger.error("Adding Retry Advice");
list.add((Advice)factory.getBean("retryAdvice"));
logger.error("Adding Cricuit Breaker Advice");
list.add((Advice)factory.getBean("circuitBreakerAdvice"));
logger.error("Adding Transactional Advice");
list.add((Advice)factory.getBean("txansactionAdvice"));
bean.setAdviceChain(list);
return bean;
}
Если бин типа ConsumerEndpointFactoryBean добавляю эти советы. Мне нужно управление транзакциями во всех этих обработчиках.
Advice
. Конфигурацияendpoint
.TransactionManager
должно бытьPlatformTransactionManager
- person Artem Bilan   schedule 09.06.2014