Как преобразовать поток, читаемый узлом, в наблюдаемый RX

Если у меня есть поток Node js, скажем, из чего-то вроде process.stdin или из fs.createReadStream, как я могу преобразовать его в поток RxJs Observable, используя RxJs5?

Я вижу, что RxJs-Node имеет метод fromReadableStream, но похоже, что его нет. был обновлен почти через год.


person JuanCaicedo    schedule 08.01.2017    source источник
comment
так работает или? Кого волнует, как часто он обновляется, если он работает   -  person smnbbrv    schedule 08.01.2017
comment
@smnbbrv Без сомнения, он отлично работает, но это RxJS4, и он несовместим с RxJS5.   -  person cartant    schedule 08.01.2017
comment
Вы можете посмотреть источник чтобы увидеть, что потребуется, чтобы преобразовать его самостоятельно - реализация довольно мала.   -  person cartant    schedule 08.01.2017


Ответы (5)


Для тех, кто ищет это, следуя рекомендации Марка, я адаптировал реализацию rx-node fromStream для rxjs5.

import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).share();
}

Обратите внимание, что он по своей сути нарушает все функции обратного давления потоков. Observables — это технология push. Все входные фрагменты будут прочитаны и переданы наблюдателю как можно быстрее. В зависимости от вашего случая, это может быть не лучшим решением.

person Quentin Roy    schedule 11.02.2017
comment
Я не проверял это с тех пор, как ушел от того, над чем работал, но если кто-то еще это сделал, и это сработало, я приму этот ответ :) - person JuanCaicedo; 11.02.2017
comment
Я использую его, и пока он работает хорошо. Я не тестировал его, думал. - person Quentin Roy; 12.02.2017

Следующее должно работать как для версии 4, так и для версии 5 (отказ от ответственности не проверено):

fromStream: function (stream, finishEventName, dataEventName) {
    stream.pause();

    finishEventName || (finishEventName = 'end');
    dataEventName || (dataEventName = 'data');

    return Observable.create(function (observer) {

      // This is the "next" event
      const data$ = Observable.fromEvent(stream, dataEventName);

      // Map this into an error event
      const error$ = Observable.fromEvent(stream, 'error')
        .flatMap(err => Observable.throw(err));

      // Shut down the stream
      const complete$ = Observable.fromEvent(stream, finishEventName);

      // Put it all together and subscribe
      const sub = data$
        .merge(error$)
        .takeUntil(complete$)
        .subscribe(observer);

      // Start the underlying node stream
      stream.resume();

      // Return a handle to destroy the stream
      return sub;
    })

    // Avoid recreating the stream on duplicate subscriptions
    .share();
  },
person paulpdaniels    schedule 09.01.2017

Приведенные выше ответы будут работать, хотя противодавление не поддерживается. Если вы пытаетесь прочитать большой файл с помощью createReadStream, они прочитают его целиком в памяти.

Вот моя реализация с поддержкой противодавления: rxjs-stream

person Dani    schedule 24.01.2021

Реализация RxJs-Node основана на RxJs4, но может быть перенесена на RxJs5 без особых усилий https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52

person Mark van Straten    schedule 08.01.2017
comment
спасибо, похоже, вам придется реализовать преобразование? - person JuanCaicedo; 09.01.2017
comment
Кажется, версии rxjs5 еще нет, так что пока да - person Mark van Straten; 09.01.2017

Поскольку потоки Node v11.14.0 поддерживают for await https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator

это означает, что вы можете передать поток оператору from().

Под капотом rxjs(v7.x.x) вызовет fromAsyncIterable(), который вернет Observable

person Vladimir Popov    schedule 15.06.2021