Я играл с кодом этот ответ и он работает без сбоев. Однако если возникает исключение, код вызывающего объекта не перехватывает его.
Как исключение фиксируется в потоках реактора 2.0? Что я хочу сделать, так это: если возникает исключение, потоковая обработка должна быть остановлена. Мне нужно создать исключение в потоке вызывающего абонента (тот, который создал пар в первую очередь).
List<Map<String, Object>> data = readData();
Streams.from(data)
.flatMap(m -> Streams.just(m)
.dispatchOn(Environment.cachedDispatcher())
.map(ignored -> {throw new RuntimeException("kaboom!");}))
.buffer()
.consume(s -> System.out.println("s: " + s));
// the exception is not thrown and there is not opportunity to deal with it.