Observable.forkJoin() не выполняется

У меня есть следующий код:

//Loop: For each user ID/Role ID, get the data
userMeta.forEach((businessRole) => {
  Observable.forkJoin(
    af.database.object('/roles/'+businessRole.$value),
    af.database.object('/users/'+businessRole.$key)
  ).subscribe(
    data => {
      console.log("Data received");
      data[1].role = data[0];
      this.users.push(data[1]);
    },
    err => console.error(err)
  );

Я пытаюсь подписаться на результат двух наблюдаемых, используя forkJoin.

По некоторым причинам сообщение «Данные получены» не отображается.

Мои переменные userMeta отлично выглядят в console.log:

введите описание изображения здесь

Что не так?

Обновление: следующий код тоже ничего не возвращает

let source = Observable.forkJoin(
        af.database.object('/roles/'+businessRole.$value),
        af.database.object('/users/'+businessRole.$key)
    );
    let subscription = source.subscribe(
      function (x) {
    console.log("GOT: " + x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

На самом деле я пытаюсь улучшить производительность следующего кода:

//Subscription 3: role ID to role Name
        af.database.object('/roles/'+businessRole.$value)
        .subscribe((roleData) => {
        //Subscription 4: Get user info
        af.database.object('/users/'+businessRole.$key).subscribe(user => {

person TheUnreal    schedule 28.10.2016    source источник
comment
forkJoin() выдает значение после завершения обоих Observables, так что вы уверены, что они это делают? Может быть, один из них заканчивается ошибкой...   -  person martin    schedule 28.10.2016
comment
Ошибок тоже нет (см. обновление вопроса). Они также должны работать, потому что прежний код был подпиской внутри подписки, и это работало.   -  person TheUnreal    schedule 28.10.2016
comment
forkJoin() не передает ошибки из исходных Observables, поэтому ничего не будет напечатано, даже если возникнут ошибки. Если вы хотите убедиться, что он не выдает ошибок, вам нужно подписаться на каждый из исходных Observables.   -  person martin    schedule 28.10.2016


Ответы (5)


forkJoin() требует, чтобы все исходные Observable генерировали хотя бы раз и до конца.

Следующая демонстрация завершается, как и ожидалось:

const source = forkJoin(
  from([1,2,3]),
  from([9,8,7,6])
).subscribe(
  x => console.log('GOT:', x),
  err => console.log('Error:', err),
  () => console.log('Completed')
);

Живая демонстрация: https://stackblitz.com/edit/rxjs-urhkni

GOT: 3,6
Completed

Январь 2019 г.: обновлено для RxJS 6

person martin    schedule 28.10.2016
comment
Вы проверили мой обновленный код в конце вопроса? потому что он работает с теми же наблюдаемыми, просто не работает forkJoin(). Это действительно странно... - person TheUnreal; 28.10.2016
comment
@TheUnreal Но это не то же самое, что вы хотите сделать с forkJoin(). .subscribe((roleData) => {... выдает значение при каждом вызове next. forkJoin(). forkJoin() требует завершения обоих Observable. в противном случае он ничего не генерирует. - person martin; 28.10.2016
comment
Что ж, похоже, я пропустил forkJoin(). Поскольку ваш ответ верен, я одобрю его и открою новый вопрос о моем случае. - person TheUnreal; 28.10.2016
comment
Observable.combineLatest() может быть тем, что вы ищете @TheUnreal. - person Jakub Barczyk; 16.10.2017
comment
@JakubBarczyk Спасибо. Если бы я мог проголосовать за вас дважды, я бы это сделал. - person David Morton; 20.07.2018
comment
Наглядный пример forkJoin, подробно описывающий метод complete, отличный материал - person Drenai; 10.10.2018

Просто добавьтеObserver.complete();

Не работает:

observer.next(...)

Будет работать:

observer.next(...);
observer.complete();

Надеюсь, поможет.

person Itay Ben Shmuel    schedule 17.04.2019

Я столкнулся с похожей проблемой: я динамически создавал список наблюдаемых и заметил, что forkjoin() никогда не выдает и не завершается, если список наблюдаемых пуст, тогда как Promise.all() разрешается с пустым списком :

Observable.forkJoin([])
    .subscribe(() => console.log('do something here')); // This is never called

Обходной путь, который я нашел, состоит в том, чтобы проверить длину списка и не использовать этот оператор, когда он пуст.

return jobList.length ? Observable.forkJoin(jobList) : Observable.of([]);
person bgondy    schedule 06.03.2017
comment
У меня есть генераторы событий для дочерних компонентов, которые срабатывают динамически. Я привязываю его к выходному параметру. Я прикрепил его к родителю, и мне нужно дождаться завершения всех наблюдаемых. - person chris_r; 28.08.2018

У меня была аналогичная проблема с использованием Angular 2/Angularfire 2, особенно когда я искал, существуют ли пользователи по электронной почте. В одном случае пользователь существует и я получил массив одного объекта из Observable. В другом случае пользователя не существовало, и я получил пустой массив.

Когда я использовал forkJoin с селектором результата и подпиской, ни селектор результата, ни функция подписки никогда не запускались. Однако, когда я попытался

Observable.zip(
  FirebaseListObservable,
  FirebaseListObservable,
  (...results) => {
    return results.map(some code here)
  }
).subscribe(res => console.log(res));

И селектор, и подписка работали. Я предполагаю, что это связано с ответом @martin, где forkJoin требует завершения наблюдаемых, потому что по определению он возвращает последние выбросы. Если наблюдаемое никогда не завершается, я полагаю, что у него никогда не может быть последнего выброса.

Возможно, наблюдаемые объекты списка angularfire (или наблюдаемые объекты в вашем случае) никогда не завершатся, что делает невозможным использование forkJoin. К счастью, zip имеет похожее поведение и все еще работает, с той разницей, что он может повторяться несколько раз, если данные изменяются в Firebase, тогда как forkJoin объединяет только последний ответ.

В моем случае я смотрю на 1) использование zip и допускаю, что мой код может выполняться несколько раз, если пользовательские данные изменяются, пока .zip все еще работает, 2) вручную отключать zip после возврата первого набора данных или 3) отказаться от Angularfire и попробовать API Firebase напрямую, используя что-то вроде .once, чтобы посмотреть, смогу ли я получить наблюдаемое, которое завершает и запускает forkJoin.

person ansorensen    schedule 15.02.2017
comment
у меня была точно такая же проблема, которую можно было довольно просто решить, используя Observable.combineLatest вместо Observable.forkJoin. Кредиты идут на github.com/angular/angularfire2/issues/617 - person djnose; 21.02.2018
comment
Спасибо, что поделились этим, это действительно очень полезно. - person Becario Senior; 30.08.2018
comment
Спасибо, это было чрезвычайно полезно. Я выполнял запрос в Firestore, который может возвращать или не возвращать значение, и когда их не было, forkJoin ничего не возвращал. Я просто заменил forkJoin на CombineLatest, и все заработало отлично. Спасибо, что сэкономили еще час исследований. - person Steve Klock; 16.09.2020

У меня была та же проблема, и я не мог заставить работать оператор forkJoin, поэтому я просто использовал combineLatest, который сработал!

person Dávid Konkoly    schedule 29.09.2020