Как поймать/обнаружить исключения в многопоточной карте/уменьшить с помощью Reactor framework 2.x?

Я играл с кодом этот ответ и он работает без сбоев. Однако если возникает исключение, код вызывающего объекта не перехватывает его.

Как исключение фиксируется в потоках реактора 2.0? Что я хочу сделать, так это: если возникает исключение, потоковая обработка должна быть остановлена. Мне нужно создать исключение в потоке вызывающего абонента (тот, который создал пар в первую очередь).

List<Map<String, Object>> data = readData(); 

Streams.from(data)
       .flatMap(m -> Streams.just(m)
                            .dispatchOn(Environment.cachedDispatcher()) 
                            .map(ignored -> {throw new RuntimeException("kaboom!");}))
       .buffer() 
       .consume(s -> System.out.println("s: " + s)); 
// the exception is not thrown and there is not opportunity to deal with it.

person pabloa98    schedule 16.01.2015    source источник


Ответы (1)


В Reactor вам просто нужно обернуть исключения и вернуть их как Flux.error()

Затем вы можете обрабатывать их в методах onErrorXXX (например, onErrorResume)

Узнать больше:

https://projectreactor.io/docs/core/release/reference/#error.handling

person pixel    schedule 15.02.2019