Я хочу создать Reactor Flux из gRPC StreamObserver. Это необходимо сделать до тех пор, пока StreamObserver не реализует соответствующие интерфейсы изначально (см., например, эта проблема).
То, что я придумал, примерно похоже на следующее:
final StreamObserver<ProtoResponse>[] streamObserverArray = new StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
@Override
public void onNext(ProtoResponse value) {
final Response response = convertFromProto(value);
sink.next(response);
}
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onCompleted() {
sink.complete();
}
});
myFlux
.doOnError(throwable -> {/* actual logic in here */}) //
.doOnComplete(() -> {/* actual logic in here */}) //
.doOnCancel(() -> {/* actual logic in here */}) //
.parallel() //
.runOn(Schedulers.parallel()) //
.doOnNext(/* actual heavy lifting logic in here */) //
.map(/* ... */) //
.sequential() //
.doOnNext(/* ...*/) //
.subscribe(); // needed to start the actual processing of the events on this Flux
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);
Основная идея, почему я хочу использовать здесь Reactor, заключается в том, чтобы распределить «тяжелую работу» на несколько параллельных потоков, а не делать это в потоках запросов gRPC.
Я вижу несколько проблем с подходом, как это сделано выше:
- Мне действительно не нравится обходной путь с массивом
StreamObserver[]
- Сначала мне нужно создать полный поток, потому что, если я не закончу его с помощью
.subscribe()
,StreamObserver
может статьnull
, когда gRPC начнет обмениваться данными (так называемое состояние гонки). - Я не уверен, работает ли обратное давление так, как задумано (хотя в настоящее время это не является моей главной заботой).
Итак, мои вопросы будут такими: каков наилучший/предпочтительный способ перехода от gRPC StreamObserver к Reactor Flux? Есть ли передовой опыт?