Переход с RxJS4 на RxJS5 — реализация Observer

В одном из моих проектов у меня есть следующий код, и после перехода на RxJS5 Rx.Observer больше не определяется:

let index = 0;

let obsEnqueue = this.obsEnqueue = new Rx.Subject();

this.queueStream = Rx.Observable.create(obs => {
    var push = Rx.Observer.create(v => {             // ! error
        if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
            obs.next(v);
        }
    });
    return obsEnqueue.subscribe(push);
});

this.push = (v) => {
    obsEnqueue.next(v);
    index++;
};

это больше не работает, потому что Rx.Observer не определено

в руководстве по миграции:

https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md

он говорит:

Observer теперь интерфейс

Однако это не должно означать, что Rx.Observer, даже если это интерфейс, не должен иметь «статический» метод, называемый create.

В любом случае, Rx.Observer больше не существует. Я получаю эту ошибку:

TypeError: Cannot read property 'create' of undefined

Как я могу создать наблюдателя, чтобы получить результаты, аналогичные моему коду выше?


person Alexander Mills    schedule 30.12.2016    source источник


Ответы (4)


Из источника:

export interface Observer<T> {
  closed?: boolean;
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

Observer<T> — это интерфейс с методами onNext, onCompleted и onError. Интерфейс — это всего лишь языковая конструкция. Он просто используется компилятором машинописного текста для проверки типов объектов, требующих Observer<T>. Он стирается при компиляции.

Класс Subscriber<T> реализует интерфейс Observer<T>. Это означает, что Subscriber<T> является фактическим конкретным классом с указанными выше методами.

Поэтому вместо этого вы используете var push = Rx.Subscriber.create(v => { [...].

Примечание:

В исходной реализации Rx интерфейсы были IObservable<T> и IObserver<T> и использовались методы расширения для обеспечения возможности композиции. Когда дело дошло до JS, у них должны были быть методы прототипа самого Observable/Observer, чтобы включить композицию, поэтому у самого класса были методы.

person Asti    schedule 30.12.2016
comment
Как вы думаете, можно ли упростить код так, как это делает Марк? - person Alexander Mills; 30.12.2016
comment
Нет, вам нужна прямая ссылка, чтобы быть наблюдателем. - person Asti; 31.12.2016
comment
в коде, который вы мне дали, я пытаюсь выяснить, в чем разница между использованием this.queueStream и this.obsEnqueue, приведет ли это к одному и тому же результату или нет? Почему? Пожалуйста, спасибо! - person Alexander Mills; 06.01.2017
comment
Вот связанный с этим вопрос: stackoverflow.com/questions/41499889/ - person Alexander Mills; 06.01.2017
comment
queueStream — это фактическая наблюдаемая величина, которую вы хотите передать. obsEnqueue — это тема, в которую вы отправляете элементы, чтобы она могла отслеживать все подписки без вашего участия. - person Asti; 06.01.2017
comment
Ты пользуешься скайпом или как? Такой асинхронный формат не работает. - person Asti; 06.01.2017
comment
да, на самом деле, теперь я понял, что вы имеете в виду, так что не волнуйтесь, да, у меня есть скайп, но гангауты проще - [email protected] - person Alexander Mills; 06.01.2017

Честно говоря, я не понимаю, что делает ваш код, но хотя класс Observer больше не существует, он был в основном заменен классом Subscriber, который используется почти так же, как Observer.

Он имеет статический метод Subscriber.create. См. https://github.com/ReactiveX/rxjs/blob/master/src/Subscriber.ts#L32

Этот метод возвращает объект Subscriber, который впоследствии можно использовать, например obsEnqueue.subscribe(push);.

person martin    schedule 30.12.2016
comment
LOL Я на самом деле тоже не знаю, как это работает, я украл это у кого-то другого, и оно действительно работает, я просто не знаю, как :) Я думаю, что был бы способ упростить это, и Марк подходит к этому. - person Alexander Mills; 30.12.2016

Почему бы просто не встроить функцию onNext прямо в подписку?

this.queueStream = Rx.Observable.create(obs => {
    return obsEnqueue.subscribe(
      v => {
        if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
            obs.next(v);
        }
      }
    );
});
person Mark van Straten    schedule 30.12.2016
comment
переменная push не определена в вашем случае, похоже, нам нужен дескриптор этого - person Alexander Mills; 30.12.2016

Если бы я понял, что этот частичный код пытается сделать...
Я думаю, что нет,
я не вижу, как можно сделать это "упрощением".
возможно, что здесь нужно улучшить
заключается в том, чтобы сделать его более "повторно используемым",
сделать его модулем ?,
или, возможно, оператором Rx, если его еще нет...

это может быть попытка

/*
    "dependencies": {
        "rxjs": "^5.0.2"
    }
*/

import {Observable, Observer, Subject, Subscriber} from "rxjs";

export interface ICircularQueue<T> extends Observable<T> {
    push(value: T): void;
}

/**
* on every push use 'NEXT' observer/subscription,
* in the order they've been subscribed,
* cycling back to 1st subscription after last
*/
export function create<T>(): ICircularQueue<T> {

    let index = 0;

    let obsEnqueue = new Subject<T>();

    let queueStream = Observable.create((obs: Observer<T>) => {

        let push = Subscriber.create<T>(v => {
            // ! error ?
            if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
                obs.next(v);
            }
        });

        return obsEnqueue.subscribe(push);
    });

    queueStream.push = (v: T) => {
        obsEnqueue.next(v);
        index++;
    };

    return queueStream;
}

потом базовый тест....

import * as CircularQueue from "./CircularQueue";
import * as assert from "assert";

const $in = (array: any[], x: any) => {
    for (let $x of array) {
        if ($x === x) { return true; }
    }
    return false;
};

describe("CircularQueue", () => {

    it("works", () => {
        let queue = CircularQueue.create();

        let result: number[] = [];
        queue.subscribe(x => {
            assert.ok($in([0, 4, 8], x));
            result.push(0);
        });
        queue.subscribe(x => {
            assert.ok($in([1, 5, 9], x));
            result.push(1);
        });
        queue.subscribe(x => {
            assert.ok($in([2, 6, 10], x));
            result.push(2);
        });
        queue.subscribe(x => {
            assert.ok($in([3, 7, 11], x));
            result.push(3);
        });

        for (let i = 0; i < 12; i++) {
            queue.push(i);
        }

        assert.equal(result.join(), "0,1,2,3,0,1,2,3,0,1,2,3");
    });
});
person Dan    schedule 31.12.2016