как использовать многоадресную рассылку сообщений с объектами поведения?

В общем, нам нужна функциональность предмета поведения. Но только при первой подписке мы должны отправить подписку на сервер в REST. И чтобы отправить отказ от подписки при последнем отказе от подписки, и все поздние подписавшиеся наблюдатели будут получать последний json, полученный с первого. Могу ли я сделать это с помощью операторов rxjs и как? или я использую пользовательский Obserbale?

в настоящее время настраиваемый код для этого таков:

public observable: Observable<TPattern> = new Observable((observer: Observer<TPattern>) => {
 this._observers.push(observer);
 if (this._observers.length === 1) {
  this._subscription = this.httpRequestStream$
    .pipe(
      map((jsonObj: any) => {
        this._pattern = jsonObj.Data;
        return this._pattern;
      })
    )
    .subscribe(
      (data) => this._observers.forEach((obs) => obs.next(data)),
      (error) => this._observers.forEach((obs) => obs.error(error)),
      () => this._observers.forEach((obs) => obs.complete())
    );
}
if (this._pattern !== null) {
  observer.next(this._pattern); // send last updated array
}
return () => {
  const index: number = this._observers.findIndex((element) => element === observer);
  this._observers.splice(index, 1);
  if (this._observers.length === 0) {
    this._subscription.unsubscribe();
    this._pattern = null; // clear pattern when unsubscribed
  }
};

});


person Shaul Naim    schedule 26.05.2020    source источник
comment
Не могли бы вы привести конкретный пример того, что вы ищете? например, "Учитывая X, я хочу достичь Y"   -  person Andrei Gătej    schedule 27.05.2020
comment
Чем отличается от стандартного BehaviorSubject? Вы можете использовать его с multicast(new BehaviorSubject)   -  person martin    schedule 27.05.2020


Ответы (1)


Похоже, вам нужен shareReplay(1), он поделится последним ответом со всеми подписчиками.

const stream$ = httpRequestStream$.pipe(
  shareReplay(1),
),

stream$.subscribe(); // sends the request and gets its result
stream$.subscribe(); // doesn't send it but gets cached result
stream$.subscribe(); // doesn't send it but gets cached result
stream$.subscribe(); // doesn't send it but gets cached result

person satanTime    schedule 27.05.2020
comment
Спасибо, звучит так, как будто это правильный путь, но что будет делать ставку на результат, когда httpRequestStream $ получит новый результат (он получает из websocket), все ли подписчики получат новый? а как в этой реализации работает отписка? также должны strem и httpRequestStream $ быть behaviorSubjects? - person Shaul Naim; 27.05.2020
comment
в случае, если он излучает, все будут уведомлены с новым значением. когда все отписались, подписка закрывается, если httpRequestStream $ завершено, то подписки тоже выполняются. и да, это может быть BehaviourSubject или любой другой Observable. - person satanTime; 27.05.2020
comment
ааа, спасибо, чувак. Чтобы быть в безопасности, я добавил текущий код, делая то, что я пытаюсь достичь, может ли shareReplay заменить его? - person Shaul Naim; 27.05.2020
comment
именно так. оберните this.httpRequestStream$ .pipe(shareReplay(1)), и все готово к работе. - person satanTime; 27.05.2020
comment
что вы имеете в виду, говоря обернуть this.httpRequestStream $? - person Shaul Naim; 27.05.2020
comment
использовать this.wrap$ = this.httpRequestStream$.pipe(shareReplay(1));, потому что если вы вызовете this.httpRequestStream$.pipe(shareReplay(1)) несколько раз, он создаст несколько повторов вместо одного. - person satanTime; 27.05.2020
comment
и AFAYK эта строка, this.wrap $ = this.httpRequestStream $ .pipe (shareReplay (1)); соответствует ли опубликованному мной коду? нет необходимости в дальнейшей логике? для отписки например и т.д ... - person Shaul Naim; 27.05.2020
comment
Да, если вы сделали это внутри httpRequestStream $ - тогда ничего лишнего делать не надо. - person satanTime; 27.05.2020