Как использовать RxJS и Observables для выполнения http-запроса только после запуска нескольких других событий?

У меня возникли проблемы с объединением испускаемых значений двух разных Observable для использования в HTTP-запросе, а затем с возвратом Observable запроса для использования клиентами.

Немного предыстории: я работаю над расширением для Twitch. Частью их экосистемы является то, что расширения получают информацию о среде через обратные вызовы событий. Интересующие меня находятся по адресам window.Twitch.ext.onAuthorized() и window.Twitch.ext.configuration.onChanged() (если интересно, подробнее см. здесь: https://dev.twitch.tv/docs/extensions/reference#helper-extensions).

При совершении вызовов на мой бэкэнд мне нужна информация из обоих вышеупомянутых событий. Они не будут часто меняться (если вообще когда-либо), но я не могу совершать звонки, пока они оба не будут доступны, и я хочу получить самое последнее предоставленное значение при совершении звонков. Похоже, BehaviorSubjects идеально подходит для этого:

export class TwitchService {
  private authSubject: BehaviorSubject<TwitchAuth> = new BehaviorSubject<TwitchAuth>(null);
  private configSubject: BehaviorSubject<TwitchConfig> = new BehaviorSubject<TwitchConfig>(null);
  private helper: any;

  constructor(private window: Window) {
    this.helper = this.window['Twitch'].ext;
    this.helper.onAuthorized((auth: TwitchAuth) => {
      this.authSubject.next(auth);
    });
    this.helper.configuration.onChanged(() => {
      this.configSubject.next(JSON.parse(this.helper.configuration.global.content));
    });
  }

  onAuthorized(): Observable<TwitchAuth> {
    return this.authSubject.asObservable().pipe(filter((auth) => !!auth));
  }

  onConfig(): Observable<TwitchConfig> {
    return this.configSubject.asObservable().pipe(filter((config) => !!config));
  }
}

Эта модель хорошо работает для частей приложения, которые подписываются на один из этих двух Observable. Моя проблема в том, что я не могу найти рабочий способ объединить их и использовать последние выданные значения из обоих для создания одноразового Observable для http-запросов.

Вот что у меня есть до сих пор:

type twitchStateToObservable<T> = (auth: TwitchAuth, config: TwitchConfig) => Observable<T>;

export class BackendService {
  constructor(private http: HttpClient, private twitch: TwitchService) {}

  private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
    let subject = new Subject<T>();
    combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(), // because we only want to make the http request one time
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
    );
    return subject.asObservable();
  }

  exampleHttpRequest(): Observable<Response> {
    return this.httpWithTwitchState((auth, config) =>
      // this.url() and this.headers() are private functions of this service
      this.http.get<Response>(this.url(config) + 'exampleHttpRequest', { headers: this.headers(auth, config)})
    );
  }
}

И затем клиент этой службы должен иметь возможность делать http-запросы с помощью этого простого вызова, без необходимости знать что-либо о событиях или заботиться о том, когда они срабатывают:

this.backend.exampleHttpRequest().subscribe((resp) => {
  // do stuff with resp here...but this is never hit
}

Насколько я понимаю, combineLatest() должен выдавать новые значения всякий раз, когда любой из входных Observable выдает новое значение. Однако вызов f(auth, config) внутри map() никогда не запускается в моем приложении. Я протестировал его с точками останова, с console.logs и отслеживая вкладку «Сеть» в инструментах отладчика браузера. Возможно, вызов first() отбрасывает его, но я не хочу повторять http-запрос, если события снова сработают, по очевидным причинам.

Заранее спасибо за любые советы или указатели!


person lettucemode    schedule 03.08.2020    source источник
comment
Несмотря на некоторое неидиоматическое использование RxJS, похоже, что он должен работать. Вы уверены, что обратные вызовы helper.onAuthorized и helper.configuration.onChanged действительно запускаются в TwitchService. Я могу представить себе случай, когда хелпер сгенерировал свои события до создания экземпляра TwitchService.   -  person amakhrov    schedule 03.08.2020
comment
@amakhrov хорошая идея - я проверил ее с некоторыми точками останова. Сначала срабатывает конструктор TwitchService, затем срабатывает httpWithTwitchState() (поскольку другой компонент выполняет HTTP-вызов в своем ngOnInit()), затем оба события срабатывают в TwitchService. Это совпадает с моим ожидаемым порядком событий. У вас есть что-нибудь, что я мог бы прочитать об идиоматическом использовании RxJS? Может быть, если я уберу это, проблема решится сама собой.   -  person lettucemode    schedule 04.08.2020


Ответы (2)


Вы можете представить concat как очередь в банкомате: следующая транзакция (подписка) не может начаться, пока не завершится предыдущая!

Пример: https://stackblitz.com/edit/typescript-3qsfgk?file=index.ts&devtoolsheight=100

Образец:

const mergeValue = concat(authSubject,configSubject) 

mergeValue.subscribe( (x) => {
  // do stuff with resp here
});
person karthicvel    schedule 03.08.2020
comment
Эй, спасибо за ваш ответ, но я не думаю, что concat подходит для моего варианта использования. Я пытаюсь скоординировать вывод двух событий, которые я не контролирую, когда они срабатывают, и делаю http-вызов только после того, как они оба сработали, используя данные каждого из них. - person lettucemode; 04.08.2020

Хорошо, оказывается, я упустил что-то очевидное. Я не подписывался на вызов combineLatest(), поэтому он никогда не выполнялся. После добавления subscribe() в конце это сработало нормально:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  let subject = new Subject<T>();
  combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(),
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
  ).subscribe(); // adding this fixed the problem
  return subject.asObservable();
} 

После этого я провел небольшое исследование о том, как отменить вложенность подписок в подобных случаях, что привело меня к оператору flatMap(). Итак, теперь функция httpWithTwitchState() выглядит так:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  return combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
    first(),
    map(([auth, config]) => f(auth, config)),
    flatMap((resp) => resp)
  );
}

Теперь это работает, как и ожидалось, и стало чище. Спасибо тем, кто нашел время, чтобы прочитать мой вопрос и/или ответ.

person lettucemode    schedule 04.08.2020
comment
Хороший улов! Кстати, это часть неидиоматического кода rxjs :). Нет необходимости создавать новую тему внутри httpWithTwitchState. И, кроме того, не нужно подписываться внутри оператора map() (что на самом деле плохо — map предназначен для преобразования данных, а не для побочных эффектов. - person amakhrov; 04.08.2020
comment
Вместо этого этот метод будет выглядеть так: return combineLatest([onAuth, onConfig]).pipe(first(), switchMap(([auth, conf]) => f(auth, conf))) - person amakhrov; 04.08.2020