Как обрабатывать ошибки JmsChannelFactoryBean, есть ли возможность использования пользовательского канала ошибок?

У меня есть следующая конфигурация для создания двух каналов (с помощью JmsChannelFactoryBean):

@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
    fb.setConnectionFactory(activeMQConnectionFactory);
    fb.setDestinationName("something.queue");
    fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
    return fb;
}

@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
    fb.setConnectionFactory(activeMQConnectionFactory);
    fb.setDestinationName("something.queue.DLQ");
    fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
    return fb;
}

Что-то.очередь настроена таким образом, чтобы помещать мертвую букву в что-то.очередь.DLQ. Я использую в основном Java DSL для настройки приложения, и, если возможно, хотел бы сохранить это.

Случай: сообщение берется из jmsChannel, помещается в исходящий шлюз sftp, если возникает проблема с отправкой файла, сообщение помещается обратно в jmsChannel как не доставленное. После нескольких попыток он оформляется как отравление и помещается в something.queue.DLQ.

  1. Возможно ли получить информацию о канале ошибки, когда это произойдет?
  2. Как лучше всего обрабатывать ошибки при использовании каналов сообщений, поддерживаемых JMS?

ИЗМЕНИТЬ 2

Поток интегрирования определяется как:

IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)

Где filesToProcessChannel — это поддерживаемый JMS канал, а исходящий шлюз определяется как:

@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
    SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
    ArrayList<Advice> adviceChain = new ArrayList<>();
    adviceChain.add(errorHandlingAdvice());
    gateway.setAdviceChain(adviceChain);
    return gateway;
}

Я пытаюсь получить исключение, используя совет:

@Bean
public Advice errorHandlingAdvice() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(1);
    retryTemplate.setRetryPolicy(retryPolicy);
    advice.setRetryTemplate(retryTemplate);
    advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
    return advice;
}

Это правильный путь?

ИЗМЕНИТЬ 3

Определенно что-то не так с SFTPOutboundGateway и советами (или со мной:/): я использовал следующий совет из весенней ссылки на интеграцию:

@Bean
public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString(
            "payload + ' was bad, with reason: ' + #exception.cause.message");
    advice.setTrapException(true);
    return advice;
}

@Bean
public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
}

@Bean
public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
}

И когда я использую:

return IntegrationFlows.from(filesToProcessChannel)
            .handle((GenericHandler<File>) (payload, headers) -> {
                if (payload.equals("x")) {
                    return null;
                }
                else {
                    throw new RuntimeException("some failure");
                }
            }, spec -> spec.advice(expressionAdvice()))

Он вызывается, и я получаю сообщение об ошибке (и это ожидается), но когда я пытаюсь использовать:

return IntegrationFlows.from(filesToProcessChannel)
            .handle(outboundGateway, spec -> spec.advice(expressionAdvice()))

Совет не вызывается, а сообщение об ошибке возвращается в JMS.

Приложение использует Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE.

ИЗМЕНИТЬ 4

Мне удалось решить проблему с советом, используя следующую конфигурацию, но до сих пор не понимаю, почему спецификация обработчика не будет работать:

@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
                      ...
) {
    return IntegrationFlows.from(filesToProcessChannel)
            .handle(outboundGateway)
            ...
            .log(LoggingHandler.Level.INFO)
            .get();
}

@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
    SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
    ArrayList<Advice> adviceChain = new ArrayList<>();
    adviceChain.add(expressionAdvice());
    gateway.setAdviceChain(adviceChain);
    return gateway;
}


@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString(
            "payload + ' was bad, with reason: ' + #exception.cause.message");
    advice.setTrapException(true);
    return advice;
}

@Bean
public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
}

@Bean
public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
}

person Błażej Karwowski    schedule 10.03.2018    source источник


Ответы (1)


Поскольку движение в DLQ выполняет брокер, у приложения нет механизма логирования ситуации — оно даже не в курсе, что это произошло.

Вам придется самостоятельно перехватывать исключения и публиковать сообщение в DLQ после некоторого количества попыток (заголовок JMSXDeliveryCount) вместо использования политики брокера.

ИЗМЕНИТЬ

Добавьте Advice к шагу .handle().

.handle(outboundGateway, e -> e.advice(myAdvice))

Где myAdvice реализует MethodInterceptor.

В методе invoke после сбоя вы можете проверить заголовок счетчика доставки и, если он превышает ваш порог, опубликовать сообщение в DLQ (например, отправить его на другой канал, на который подписан исходящий адаптер JMS) и зарегистрировать ошибку; если порог не был превышен, просто вернуть результат invocation.proceed() (или перегенерировать исключение).

Таким образом, вы контролируете публикацию в DLQ, а не брокер. Вы также можете добавить дополнительную информацию, например, исключение, в заголовки.

ИЗМЕНИТЬ2

Вам нужно что-то вроде этого

public class MyAdvice implements MethodInterceptor {

    @Autowired
    private MessageChannel toJms;

    public Object invoke(MethodInvocation invocation) throws Throwable {
        try {
            return invocation.proceed();
        }
        catch Exception(e) {
            Message<?> message = (Message<?>) invocation.getArguments()[0];
            Integer redeliveries = messasge.getHeader("JMXRedeliveryCount", Integer.class);
            if (redeliveries != null && redeliveries > 3) {
                this.toJms.send(message); // maybe rebuild with additional headers about the error
            }
            else {
                throw e;
            }
        }
    }
}

(должно быть близко, но я не проверял). Предполагается, что ваш брокер заполняет этот заголовок.

person Gary Russell    schedule 12.03.2018
comment
Я добавил идею конфигурации в свой предыдущий пост. - person Błażej Karwowski; 12.03.2018
comment
Я попробовал что-то простое, например: IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway, e -> e.advice((MethodInterceptor) invocation -> { Object continue = invocation.proceed(); return continue; })) Я поставил точку останова на объекте continue = invocation.proceed(); но это не остановится. Что я делаю неправильно? - person Błażej Karwowski; 12.03.2018
comment
Не размещайте код в комментариях; трудно читать; вместо этого отредактируйте вопрос и добавьте комментарий, чтобы сказать, что вы это сделали. Смотрите мою вторую правку. - person Gary Russell; 12.03.2018
comment
Ваш первый ответ был правильным, совет по использованию обходного пути - должен работать. Я сделал тест с GenericHandler -> совет работает как положено, тест с шлюзом -> не работает, совет не вызывается, не знаю причину. Я добавил тот код, который работает, и тот, который не работает. И извините за код в комментарии, немного поторопился с ответом. - person Błażej Karwowski; 12.03.2018
comment
Ваш `.handle(outboundGateway, spec -> spec.advice(expressionAdvice()))` будет работать, если вы не объявите outboundGateway() как @Bean. Просто потому, что вы не можете изменить состояние компонента после его инициализации. Или иначе было бы лучше, как вы, в конце концов, - инициализируйте bean-компонент и используйте его в DSL только в качестве эталона. Вместо этого есть фабрика Sftp.outboundGateway(). - person Artem Bilan; 13.03.2018