Как запустить повторные попытки при использовании Spring AMQP @RabbitListener в реактивном приложении Spring-Webflux

У меня есть приложение spring-webflux, которое должно получать сообщения от rabbitMQ. В предыдущих приложениях, когда я НЕ использовал spring-webflux, я смог:

  1. Настройте политику повтора при объявлении очереди
  2. Настройте прослушиватель кролика, используя аннотацию @RabbitListener
  3. Запустите повторную попытку, создав исключение в функции-обработчике.

В spring-webflux я не могу выдать ошибку, у меня просто MonoError, как мне вызвать повторную попытку?

Мой код выглядит примерно так в настоящее время

@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {

  private final VehicleService service;
  private final OperationFactory operationFactory;

  @RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
  void handleVehicleUpdated(Message message) {
    Mono.just(message)
      .map(operationFactory::generateOperationFromMessage)
      .flatMap(service::handleOperation) // want to retry if downstream app is down
      .subscribe();
  }
}

РЕДАКТИРОВАТЬ

Теперь я понял, что это возможно. Например, если клиентский код возвращает Mono<Exception>, это вызовет повторные попытки. Точно так же я мог бы условно инициировать повторные попытки сопоставления с Mono<Exception>. Например, если я хочу инициировать повторную попытку, когда продукт из сообщения не существует, я мог бы сделать следующее.

repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> !exists)
        .flatMap(missing -> Mono.error(new Exception("my exception")))
        .then(...) // carry on if it does exist

person Joseph Berry    schedule 29.07.2020    source источник


Ответы (1)


Использование Reactor с нереактивным контейнером прослушивателя сопряжено со многими проблемами.

  1. Вы должны использовать MANUAL acks и ack/nack доставки после завершения реактивного потока.
  2. Вы должны использовать механизмы повторной попытки реактора.

Рассмотрите возможность просмотра проекта https://github.com/reactor/reactor-rabbitmq вместо Весенний АМКП. Когда-нибудь в будущем мы надеемся построить реактивные @RabbitListener, но их пока нет.

person Gary Russell    schedule 29.07.2020
comment
Я обновил свой вопрос решением, поскольку теперь это возможно - person Joseph Berry; 06.08.2020