Зависимые вложенные наблюдаемые

У меня есть два метода, в которых я выполняю некоторую асинхронную операцию, но я не хочу, чтобы метод 2 выполнялся до тех пор, пока ответ от метода 1 не будет положительным, чтобы указать, что мы можем двигаться дальше.

Итак, вот что я пробовал:

Способ 1:

private method1(): Observable<any> {
  return new Observable(() => {
    executingSomething();
    this.anotherSubscription.pipe(
      map(x => {
        console.log('Control is not reaching here');
        Also, how to indicate the caller that method2 can be executed?
        I can't return anything since I'm already returning the new Observable, above.
      })
    );
  });
}

Звонивший:

concat(this.method1(), this.method2()).subscribe();

Проблема: anotherSubscription даже не выполняется, и я не могу придумать, как передать ответ из anotherSubscription вызывающей стороне.

Я чувствую, что здесь я не использую наблюдаемые в правильном смысле, но, кажется, нигде ничего не могу найти.


person Suyash Gupta    schedule 23.12.2020    source источник
comment
Это также тесно связано: stackoverflow .com/questions/50452947/.   -  person Avius    schedule 23.12.2020


Ответы (3)


Я сделал некоторые предположения относительно вашего кода до написания решения. Если предположения неверны, то и ответ может быть таким же.

  1. executingSomething() – это синхронный метод с несвязанной функциональностью, за исключением того, что вы хотите, чтобы все это завершилось ошибкой, если возникнет исключение.
  2. anotherSubscription это не Subscribtion, а Observable (это разные вещи).

Вот как я решил бы вашу проблему:

class SomeClass {
  private method1(): Observable<any> {
    try {
      executingSomething();
    } catch (err) {
      // This would return an errorred observable, which is great, since
      // you can still subscribe to it (no need to change the return type
      // of this method).
      return throwError(err);
    }

    return this.anotherSubscription.pipe(
      tap(() => {
        console.log('Controls is reaching here!');
      }),
    );
  }

  private method2(): Observable<any> {
    // This can be whatever obervable, using of(null) for demo purposes.
    return of(null);
  }

  private parentMethod() {
    this.method1()
      .pipe(
        switchMap(valueFromAnotherSubscription => {
          // UPD1 - Implement your custom check here. This check will determine
          // whether `method2` will be called.
          if (valueFromAnotherSubscription === targetValue) {
            return this.method2();
          } else {
            // UPD1 - If the check evaluates to `false`, reemit the same value
            // using the `of` operator.
            return of(valueFromAnotherSubscription);
          }
        }),
      )
      .subscribe(() => {
        console.log('Done!');
      });
  }
}

Ключевым оператором здесь, как правильно указал Кевин, является switchMap. Каждый раз, когда наблюдаемое (anotherSubscription) испускается, switchMap отменяет его и подписывается на другое наблюдаемое (независимо от того, что возвращается из method2).

Это происходит последовательно, поэтому method2 будет подписан на после method1 испускания. Если method1 сработает, весь конвейер выйдет из строя. Вы также можете filter просмотреть результаты method1, чтобы решить, хотите ли вы перейти на method2.

Кроме того, в большинстве случаев построение наблюдаемых с использованием new Observable, вероятно, не требуется. У меня есть довольно большое приложение RxJS, и я никогда не использовал его до сих пор.

Обновление 1

См. код, обозначенный комментариями UPD1.

Имейте в виду, что если anotherSubscription выдает ошибку, функция switchMap все равно вызываться не будет.

person Avius    schedule 23.12.2020
comment
По какой-то конкретной причине вы использовали tap вместо map? - person Suyash Gupta; 23.12.2020
comment
Да, цель map — возвращать разные значения для каждого испускаемого значения. И наоборот, tap используется для побочных эффектов. Я считаю оператор log побочным эффектом, поэтому tap больше подходит. Но вы можете сделать это и внутри оператора map — это сработает, просто не забудьте вернуть свое значение:] - person Avius; 23.12.2020
comment
Еще кое-что. Как отфильтровать ответ от метода 1? На самом деле, как я могу что-то вернуть (например, логическое значение, указывающее, следует ли продолжать дальше или нет), когда я возвращаю Observable<any>? Я попытался вернуть of(false), но я не могу использовать этот false, пока не подпишусь на него, и мне нужно будет сделать это в switchMap(res => { if (res) { } ), иначе будет выполнен метод2. - person Suyash Gupta; 23.12.2020
comment
Итак, что мне ясно, так это то, что всякий раз, когда method1 выдает правильное значение, вы хотите, чтобы подписчик получил значение от method2. Что не ясно, так это то, что должно произойти, когда method1 выдает неверное значение. Я вижу два возможных сценария, подскажите какой вам нужен: а) когда method1 выдает плохое значение, вы хотите полностью игнорировать такие значения, чтобы подписчик вообще не был предупрежден; Б) когда method1 выдает неверное значение, вы просто хотите, чтобы подписчик получил это значение, пропуская method2. Надеюсь разница понятна. Я обновил ответ, используя вариант B. - person Avius; 23.12.2020
comment
Ах, я смотрел на какой-то неправильный вывод консоли и запутался. Спасибо за помощь. Это работает. - person Suyash Gupta; 23.12.2020

Вы можете явно вернуть значение serviceN в serviceN+1. Вот идея:

private setupStuff() {
  this.initRouteParams()
    .pipe(
      switchMap(serviceId => {
        return zip(of(serviceId), this.getFileInfo(serviceId))
      }),
      switchMap(([serviceId, filename]) => {
        return zip(of(serviceId), of(filename), this.getExistingFile(serviceId, filename))
      })
    )
    .subscribe(([serviceId, filename, response]) => {
      console.log(serviceId, filename, response);
    })
}
person Kevin Zhang    schedule 23.12.2020
comment
То, что я спрашиваю, и то, что вы предлагаете, совершенно разные. Я хочу вернуть некоторый ответ из наблюдаемого, который вложен в другой наблюдаемый. И вдобавок ко всему, внутреннее наблюдаемое даже не выполняется. - person Suyash Gupta; 23.12.2020
comment
Просто напишите один пример, удовлетворит ли это ваше требование? stackblitz.com/edit/angular-ivy- rn5kfe?file=src/app/ - person Kevin Zhang; 23.12.2020

Я не хочу, чтобы метод 2 выполнялся до тех пор, пока ответ от метода 1 не будет положительным.

Если вы просто хотите, чтобы излучение obs2 запускалось после излучения obs1 (независимо от того, что излучает obs1), просто используйте switchMap:

fromEvent(document, 'click')
  .pipe(
    switchMap(() => of([1]))
  )
  .subscribe(console.log); <--- will emit [1] only after the user clicks.

Если вам нужно, чтобы obs2 выдавал только после того, как obs1 выдает определенное значение, используйте switchMap с оператором фильтра (https://www.learnrxjs.io/learn-rxjs/operators/filtering/filter):

const obs$ = of(true); 
const obs2$ = of([1,2,3]);

    obs$.pipe(
        filter((result) => result === true),
        switchMap(() => obs2$)
      ).subscribe(console.log) <--- returns [1,2,3] since filter condition returns true

если вам нужно что-то вроде оператора If Else, используйте условный оператор IIF (https://www.learnrxjs.io/learn-rxjs/operators/conditional/iif):

const even$ = of('even');
const odd$ = of('odd');

interval(1000).pipe(
  mergeMap(v =>
    iif(
      () => v % 2 === 0,
      even$,
      odd$
    ))
).subscribe(console.log); <--- will emit every second a string "even" or "odd" based on the condition.
person GBra 4.669    schedule 23.12.2020