У меня есть следующая конфигурация для создания двух каналов (с помощью JmsChannelFactoryBean):
@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue.DLQ");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
Что-то.очередь настроена таким образом, чтобы помещать мертвую букву в что-то.очередь.DLQ. Я использую в основном Java DSL для настройки приложения, и, если возможно, хотел бы сохранить это.
Случай: сообщение берется из jmsChannel, помещается в исходящий шлюз sftp, если возникает проблема с отправкой файла, сообщение помещается обратно в jmsChannel как не доставленное. После нескольких попыток он оформляется как отравление и помещается в something.queue.DLQ.
- Возможно ли получить информацию о канале ошибки, когда это произойдет?
- Как лучше всего обрабатывать ошибки при использовании каналов сообщений, поддерживаемых JMS?
ИЗМЕНИТЬ 2
Поток интегрирования определяется как:
IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)
Где filesToProcessChannel — это поддерживаемый JMS канал, а исходящий шлюз определяется как:
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(errorHandlingAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
Я пытаюсь получить исключение, используя совет:
@Bean
public Advice errorHandlingAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(1);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
return advice;
}
Это правильный путь?
ИЗМЕНИТЬ 3
Определенно что-то не так с SFTPOutboundGateway и советами (или со мной:/): я использовал следующий совет из весенней ссылки на интеграцию:
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
И когда я использую:
return IntegrationFlows.from(filesToProcessChannel)
.handle((GenericHandler<File>) (payload, headers) -> {
if (payload.equals("x")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, spec -> spec.advice(expressionAdvice()))
Он вызывается, и я получаю сообщение об ошибке (и это ожидается), но когда я пытаюсь использовать:
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway, spec -> spec.advice(expressionAdvice()))
Совет не вызывается, а сообщение об ошибке возвращается в JMS.
Приложение использует Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE.
ИЗМЕНИТЬ 4
Мне удалось решить проблему с советом, используя следующую конфигурацию, но до сих пор не понимаю, почему спецификация обработчика не будет работать:
@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
...
) {
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway)
...
.log(LoggingHandler.Level.INFO)
.get();
}
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(expressionAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}