У меня есть приложение spring-webflux, которое должно получать сообщения от rabbitMQ. В предыдущих приложениях, когда я НЕ использовал spring-webflux, я смог:
- Настройте политику повтора при объявлении очереди
- Настройте прослушиватель кролика, используя аннотацию @RabbitListener
- Запустите повторную попытку, создав исключение в функции-обработчике.
В spring-webflux я не могу выдать ошибку, у меня просто MonoError, как мне вызвать повторную попытку?
Мой код выглядит примерно так в настоящее время
@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {
private final VehicleService service;
private final OperationFactory operationFactory;
@RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
void handleVehicleUpdated(Message message) {
Mono.just(message)
.map(operationFactory::generateOperationFromMessage)
.flatMap(service::handleOperation) // want to retry if downstream app is down
.subscribe();
}
}
РЕДАКТИРОВАТЬ
Теперь я понял, что это возможно. Например, если клиентский код возвращает Mono<Exception>
, это вызовет повторные попытки. Точно так же я мог бы условно инициировать повторные попытки сопоставления с Mono<Exception>
. Например, если я хочу инициировать повторную попытку, когда продукт из сообщения не существует, я мог бы сделать следующее.
repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> !exists)
.flatMap(missing -> Mono.error(new Exception("my exception")))
.then(...) // carry on if it does exist