Преодоление Reactor's Flux от gRPC StreamObserver

Я хочу создать Reactor Flux из gRPC StreamObserver. Это необходимо сделать до тех пор, пока StreamObserver не реализует соответствующие интерфейсы изначально (см., например, эта проблема).

То, что я придумал, примерно похоже на следующее:

final StreamObserver<ProtoResponse>[] streamObserverArray = new  StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
        @Override
        public void onNext(ProtoResponse value) {
            final Response response = convertFromProto(value);
            sink.next(response);
        }

        @Override
        public void onError(Throwable throwable) {
            sink.error(throwable);
        }

        @Override
        public void onCompleted() {
            sink.complete();
        }
    });
myFlux
    .doOnError(throwable -> {/* actual logic in here */}) //
    .doOnComplete(() -> {/* actual logic in here */}) //
    .doOnCancel(() -> {/* actual logic in here */}) //
    .parallel() //
    .runOn(Schedulers.parallel()) //
    .doOnNext(/* actual heavy lifting logic in here */) //
    .map(/* ... */) //
    .sequential() //
    .doOnNext(/* ...*/) //
    .subscribe(); // needed to start the actual processing of the events on this Flux

MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);

Основная идея, почему я хочу использовать здесь Reactor, заключается в том, чтобы распределить «тяжелую работу» на несколько параллельных потоков, а не делать это в потоках запросов gRPC.

Я вижу несколько проблем с подходом, как это сделано выше:

  • Мне действительно не нравится обходной путь с массивом StreamObserver[]
  • Сначала мне нужно создать полный поток, потому что, если я не закончу его с помощью .subscribe(), StreamObserver может стать null, когда gRPC начнет обмениваться данными (так называемое состояние гонки).
  • Я не уверен, работает ли обратное давление так, как задумано (хотя в настоящее время это не является моей главной заботой).

Итак, мои вопросы будут такими: каков наилучший/предпочтительный способ перехода от gRPC StreamObserver к Reactor Flux? Есть ли передовой опыт?


person Dominik Sandjaja    schedule 23.10.2017    source источник


Ответы (2)


В настоящее время есть более простое решение:

https://github.com/salesforce/reactive-grpc.

Он поддерживает мостовое соединение gRPC как с Reactor, так и с RxJava 2.

person Stefan L    schedule 24.12.2018
comment
Спасибо, что разместили это здесь. Мы уже перешли на использование этой библиотеки во всех наших продуктах и ​​довольны этим! - person Dominik Sandjaja; 24.12.2018

После еще немного возни и понимания всего реактивного материала немного лучше, я пришел к следующему решению:

/**
* Bridge the StreamObserver from gRPC to the Publisher from the reactive world.
*/
public class StreamObserverPublisher implements Publisher<Long>, StreamObserver<Long> {

    private Subscriber<? super Long> subscriber;

    @Override
    public void onNext(Long l) {
        subscriber.onNext(l);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onCompleted() {
        subscriber.onComplete();
    }

    @Override
    public void subscribe(Subscriber<? super Long> subscriber) {
        this.subscriber = subscriber;
        this.subscriber.onSubscribe(new BaseSubscriber() {});
    }
}

// and somewhere else in the code
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<Long> longFlux = Flux.from(streamObserverPublisher);
longFlux.subscribe(...); // must be done before executing the gRPC request
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverPublisher);
person Dominik Sandjaja    schedule 27.10.2017
comment
привет, я просто хочу знать, почему longFlux.subscribe() должен произойти перед выполнением запроса gRPC. Спасибо. - person trjade; 21.05.2020
comment
@trjade: я действительно не помню, это было почти три года назад, и, как упоминалось в другом ответе, мы заменили наше решение, созданное вручную, библиотекой reactive-grpc от Salesforce. - person Dominik Sandjaja; 22.05.2020