откатить сообщения в канал от обработчиков

Я пытаюсь создать общий процесс обработки ошибок для всех обработчиков сообщений, которые будут использоваться в моем потоке SI. Это будет: 1. Повторить попытку при исключении соединения. 2. Остановите поток СИ с помощью прерывателя цепи. 3. Откатить неудачное сообщение в канал.

Я реализовал функции повторной попытки и разрыва цепи. Но я не могу откатить сообщение на канал. Я пытался использовать рекомендации по транзакциям. Но это не работает.

Вот код.

<bean id="retryAdvice"
    class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
    <property name="retryTemplate">
        <bean class="org.springframework.retry.support.RetryTemplate">
            <property name="backOffPolicy">
                <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                    <property name="initialInterval" value="2000" />
                    <property name="multiplier" value="2" />
                </bean>
            </property>
        </bean>
    </property>
    <property name="recoveryCallback">
        <bean
            class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
            <constructor-arg ref="recoveryChannel" />
        </bean>
    </property>
    <property name="retryStateGenerator">
        <bean
            class="org.springframework.integration.handler.advice.SpelExpressionRetryStateGenerator">
            <constructor-arg value="headers['uniqueId']" />
        </bean>
    </property>
</bean>

<int:channel id="recoveryChannel" />
<int:transformer id="defaultTransformer" input-channel="recoveryChannel"
    output-channel="loggerChannel" ref="defaultTransformer" method="transform">
</int:transformer>

<int:logging-channel-adapter id="loggerChannel"
    level="INFO" log-full-message="true" auto-startup="true">
</int:logging-channel-adapter>

<bean id="defaultTransformer"
    class="com.bestbuy.ingestion.foundation.core.util.DefaultTransformer" />

<bean id="circuitBreakerAdvice"
    class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
    <property name="threshold" value="2" />             <!-- close after 2 failures -->
    <property name="halfOpenAfter" value="60000" />     <!-- half open after 15 seconds -->
</bean>

<tx:advice id="txansactionAdvice" transaction-manager="transactionManager">
</tx:advice>

Какой тип диспетчера транзакций мне нужно использовать. Я могу использовать разные обработчики сообщений в разных источниках данных.

Вот как я добавляю эти советы в обработчики сообщений.

public Object postProcessBeforeInitialization(Object bean, String beanName)
        throws BeansException {

    logger.error("called for bean id :: "+beanName+" with bean class "+bean.getClass().getName());
    if(bean instanceof AbstractSimpleMessageHandlerFactoryBean){
        logger.error("************ Bean "+beanName+" is instance of AbstractSimpleMessageHandlerFactoryBean **********");
    }
    if(bean instanceof ConsumerEndpointFactoryBean){
        logger.error("Bean is of type ConsumerEndpointFactoryBean");
        return populateRequestHandlerAdviceChain((ConsumerEndpointFactoryBean)bean);
    }
    if(bean instanceof AbstractSimpleMessageHandlerFactoryBean){
        logger.error("Bean is of type AbstractSimpleMessageHandlerFactoryBean");
        return populateRequestHandlerAdviceChain((AbstractSimpleMessageHandlerFactoryBean<?>)bean);
    }
    return bean;
}
private Object populateRequestHandlerAdviceChain(ConsumerEndpointFactoryBean bean){
    ArrayList<Advice> list = new ArrayList<Advice>();
    logger.error("Adding Retry Advice");
    list.add((Advice)factory.getBean("retryAdvice"));
    logger.error("Adding Cricuit Breaker Advice");
    list.add((Advice)factory.getBean("circuitBreakerAdvice"));
    logger.error("Adding Transactional Advice");
    list.add((Advice)factory.getBean("txansactionAdvice"));
    bean.setAdviceChain(list);
    return bean;
}

Если бин типа ConsumerEndpointFactoryBean добавляю эти советы. Мне нужно управление транзакциями во всех этих обработчиках.


person Karthikeyan Kesavaraj    schedule 09.06.2014    source источник
comment
Покажите, пожалуйста, как вы применяете эти Advice. Конфигурация endpoint. TransactionManager должно быть PlatformTransactionManager   -  person Artem Bilan    schedule 09.06.2014
comment
Привет Артем, Обновил вопрос с кодом.   -  person Karthikeyan Kesavaraj    schedule 09.06.2014


Ответы (1)


Прежде всего: поскольку ваш txansactionAdvice вложен в retryAdvice, вы выполняете откат здесь при каждой повторной попытке.

С другой стороны, непонятно, почему вы применяете circuitBreakerAdvice для каждой повторной попытки. Я бы сказал, что эту скороговорку лучше использовать «вокруг» retryAdvice.

И txansactionAdvice должен быть сверху. Итак, это может выглядеть так:

txansactionAdvice
circuitBreakerAdvice
retryAdvice

И еще один момент: ваша транзакция не будет откатана, потому что вы используете recoveryCallback, который просто отправляет ErrorMessage и душит Exception.

HTH, и вы измените свое мнение по этому вопросу.

person Artem Bilan    schedule 09.06.2014
comment
Спасибо Артем Это было полезно. и теперь я не вижу потери сообщений. - person Karthikeyan Kesavaraj; 10.06.2014