Angular Rxjs: заставьте concat ждать очень долгого нажатия

Я пытаюсь автоматизировать процесс обновления некоторых данных в серверной части моей программы. Я использую свой интерфейс Angular, где я создал функцию, доступ к которой имеет только главный пользователь, и это должно заставить его входить в систему в каждой администрации (арендаторе), где он будет загружать некоторые объекты с некоторыми неправильными данными внутри, запрашивая у службы Google правильные данные и обновите данные в бэкэнде, а также выполните все эти операции для каждого клиента.

Я задумал написать каждую из этих операций как наблюдаемую и использовать concat, чтобы делать все по порядку, но прежде чем даже закончить получение правильных данных, что я делаю внутри крана, он уже пытается войти в систему в следующем арендаторе, поэтому, когда он на самом деле имеет правильные данные, он не сможет загрузить их в серверную часть, поскольку он откажется от них как от неправильного клиента.

Я думаю, что эта проблема вызвана длительными операциями с краном (и мне нужно сделать кое-что, что потребует еще больше времени).

Это мой фрагмент кода (без посторонних вещей):

const obsList = [] as Observable<any>[];
this.assignedTenants.forEach(tenant => {
  const obsList2 = [] as Observable<any>[];
  obsList.push(this.authenticationService.login(new Credentials(usr, psw), tenant.id));
  obsList.push(this.structureService.getStructuresWithWrongAltitude()
    .pipe(tap(structuresReceived => {
      obsList2 = [] as Observable<any>[];
      if (structuresReceived != null && structuresReceived.length > 0) {
        structuresReceived.forEach(s => {
          this.getElevation(new google.maps.LatLng(s.centro.coordinates[0], s.centro.coordinates[1]))
            .then(a => {
              s.centroAltitudine = a;
              this.obsList2.push(this.structureService.putStructure(s));
            })
            .catch();
        });
      }
  })));
  obsList.push(forkJoin(obsList2)
    .pipe(tap(() => this.storageService.logout())));
});
concat(...obsList).subscribe();

Как вы можете видеть, этот код должен создать и выполнить 3 наблюдаемых объекта для каждого клиента: первый используется для входа в систему, второй - для получения неправильных данных, получения правильных данных и подготовки к третьему, который обновит данные. Как я уже сказал, обычно при входе в кран из второго наблюдаемого, getStructuresWithWrongAltitude, я уже могу видеть, используя журналы, что он пытается войти в систему для других клиентов.

Моя теория состоит в том, что как только он получает неправильные данные, он пытается выполнить третью наблюдаемую, которая все еще недействительна, и перейти к следующему клиенту, но я не знаю, как это исправить.

Мне понадобится способ, чтобы вторая наблюдаемая не испускалась, пока нажатие не будет завершено, или другой способ предотвратить продолжение concat до завершения других операций

Спасибо за помощь

РЕДАКТИРОВАТЬ:

Я смог исправить это, превратив getElevation (который возвращает обещание) в наблюдаемый список, который, в свою очередь, создаст новый наблюдаемый список для сохранения данных.

Как я уже сказал, мне нужно сделать что-то очень похожее, с той разницей, что на этот раз крану действительно придется выполнять много вычислений, которые займут много времени, поэтому я не смогу использовать то же исправление, что и мой Остается вопрос: можно ли заставить concat ждать, пока кран не закончится?

РЕДАКТИРОВАТЬ 2 для пояснения

Как я сказал в своем последнем редактировании, этот конкретный пример был решен путем преобразования материала внутри крана в другие наблюдаемые, но у меня почти такая же проблема с другой функцией.

этой функции необходимо найти файлы внутри папки перед их загрузкой

const folderInput = this.folderInput.nativeElement;
folderInput.onchange = () => {
  this.filesUploaded = folderInput.files;
  const obsList = [] as any[];

  this.assignedTenants.forEach(tenant => {
    const obsList2 = [] as Observable<any>[];

    obsList.push(this.authenticationService.login(new Credentials(usr, psw), tenant.id));

    obsList.push(this.fileService.getAll()
      .pipe(
        tap(filesReceived => {
          if (filesReceived != null && filesReceived.length > 0) {
            console.log('upload picture: received list of files to update');

            let i = filesReceived?.length;
            filesReceived?.forEach(f => {
              const pathReceived = (f.originalFilename as string).substr(1).split('\\');

              let found = false;
              let index = -1;
              
              // searching the file in the folder
              //...
              
              if (found) {
                console.log('found a file');
                const selectedFile = this.filesUploaded[index];
                const formData = new FormData();
                formData.append('file', selectedFile, selectedFile.name);
                obsList2.push(this.fileService.updateFile(formData, f.id));
              }
              i--;
            });
            console.log('upload picture: updated obsList2');
            obsList.push(forkJoin(obsList2).subscribe(() => {
              console.log('upload picture: uploaded pictures');
              this.storageService.logout();
            }));
          }
      }))
    );
  });

  this.loadingIndicatorService.loading$.next(true);
  let counter = obsList.length;
  concat(...obsList).subscribe(() => {
    counter--;
    console.log('upload pictures: remaining phases: ' + counter);
    if (counter <= 0) {
      this.loadingIndicatorService.loading$.next(false);
  }
});
};
folderInput.click();

person MassimoDeFacci4EmmeService    schedule 11.02.2021    source источник
comment
Здесь есть несколько вещей, которые я не понимаю, но первая связана с константой obsList2. Вы объявляете его в третьей строке, но затем сбрасываете в седьмой строке с помощью кода obsList2 = [] as Observable<any>[];. Вы уверены, что это действительно работает в Typescript? Как только мы это проясним, мы можем приступить к остальному.   -  person Picci    schedule 11.02.2021
comment
Вы пробовали этот пост ТАК? stackoverflow.com/ вопросы / 43336549 /. В этой статье также рассказывается о порядке событий - juristr.com/blog / 2019/01 / Guarantee-Event-Order-with-RxJS   -  person Andy Danger Gagne    schedule 11.02.2021
comment
@Picci да, это работает, я просто снова делаю его пустым, чтобы убедиться, но он не генерирует никаких ошибок или предупреждений   -  person MassimoDeFacci4EmmeService    schedule 11.02.2021
comment
@AndyDangerGagne проблема не в том, чтобы их исключить по порядку, а в том, чтобы заставить их ждать нажатия второй трубы, что займет некоторое время. Как я уже сказал, проблема в примере была решена, но она не работает с более сложным краном.   -  person MassimoDeFacci4EmmeService    schedule 11.02.2021


Ответы (2)


Короче: Нет

Вы можете никогда никогда не заставить синхронный код ждать асинхронного кода в javascript. JS работает в одном потоке, и если вы попробуете это сделать, ваша программа остановится. JS действительно имеет async-await, чтобы он выглядел так, как будто ожидает синхронный код (но он просто помещает продолжение в цикл событий и не ждет вообще).

С другой стороны, весь синхронный код в вашем tap завершится (в 100% случаев) до того, как сможет выполняться следующая часть наблюдаемого конвейера.

Но

Есть хорошие новости, вам никогда не нужно

Вам никогда не понадобится синхронный код для ожидания асинхронного кода в javascript. Если вы используете наблюдаемые объекты, у вас есть все инструменты, необходимые для определения порядка выполнения кода.

Если внутри вашего tap, если у вас есть .then или .subscribe, вы, вероятно, делаете что-то не так. В RxJS это не зря считается запахом кода.

Ваш код (в том виде, в каком он находится прямо сейчас) трудно читать, поэтому трудно получить больше, чем общие черты того, что вы пытаетесь сделать.

Вот как я это понимаю:

Для каждого пользователя:

  1. Войдите в систему, используя идентификатор
  2. call this.fileService.getAll () // Это сделано как авторизованный пользователь? Ваша служба справится с этим за вас?
  3. вызовите this.fileService.updateFile для файлов 0+

Вот грубая попытка. Это точно не будет компилироваться. Кроме того, это можно было бы исправить, если бы я знал немного больше о функционировании ваших наблюдаемых, но они немного загадочны из кода, показанного выше.

from(this.assignedTenants).pipe(
  concatMap(tenant => concat(
    this.authenticationService.login(new Credentials(usr, psw), tenant.id),
    this.fileService.getAll().pipe(
      switchMap(filesReceived => forkJoin(
        filesReceived.map(f => {
          //Code to get formData and such
          if(found){
            return this.fileService.updateFile(formData, f.id);
          }
          return null;
        }).filter(v => v != null)
      )
    ))
  )),
).subscribe(result => {
  console.log("Result of forkjoin: ", result);
}

Некоторые рефакторинги:

/*****
 * An Observable that gets all files, updates them, then completes
 *****/
function updateFiles(): Observable<any[]>{
  return this.fileService.getAll().pipe(
    // This map should turn every file received into either:
    //  1. A service call to update that file
    //  2. null
    map(filesReceived => filesReceived.map(f => {
      //Code to get formData and such
      if(found){
        return this.fileService.updateFile(formData, f.id);
      }
      return null;
    })),
    // Filter out null entries in our serviceCalls array
    map(serviceCalls => serviceCalls.filter(
      serviceCall => serviceCall != null
    )),
    // subscribe to all our service calls at once
    switchMap(serviceCalls => forkJoin(serviceCalls))
  );
}

from(this.assignedTenants).pipe(
  // ConcatMap won't start the second tenant until the first one's 
  // updateFiles() observable completes.
  concatMap(tenant => concat(
    this.authenticationService.login(new Credentials(usr, psw), tenant.id),
    updateFiles()
  )),
).subscribe({
  next: result => console.log("The Result of login(...) or updateFiles()", result),
  complete: () => console.log("Every file for every tenant is done")
})
person Mrk Sef    schedule 11.02.2021
comment
@ MassimoDeFacci4EmmeService Я потратил время на рефакторинг и прокомментировал код, который я вам отправил. Он по-прежнему не компилируется как вход, но я попытался создать некоторое разделение задач, чтобы разбить отображение, фильтрацию, forkJoining на отдельные функции, чтобы их было легче читать. Надеюсь, это лучшее место для начала / учебы. - person Mrk Sef; 12.02.2021

Если я понял суть проблемы, я думаю, что суть в том, что операция в tap является асинхронной, и поэтому tap не совсем подходящий оператор, если вы хотите дождаться ее результата. Так что вам лучше использовать что-то вроде concatMap. Другое, что я бы сделал, - это преобразовать Promise в Observable, а затем использовать конвейер для выполнения операции выборки, вызвать службу Google и затем обновить. Последний пункт касается использования concat в конце. Это означает, что вы будете атаковать каждого арендатора последовательно. Если это то, что вы хотите сделать, это нормально. Если вы думаете, что можете действовать параллельно, вы можете подумать о замене concat на forkJoin.

Код будет выглядеть примерно так.

const obsList = [] as Observable<any>[];
this.assignedTenants.forEach(tenant => {
  obsList.push(
    // create here an Observable which executes login, fetch the wrong data, ask Google for the right data and update sequentially
    this.authenticationService.login(new Credentials(usr, psw), tenant.id)).pipe(
      concatMap(() => this.structureService.getStructuresWithWrongAltitude()),
      concatMap(structuresReceived => {
        const obsList3 = [] as Observable<any>[];
        if (structuresReceived != null && structuresReceived.length > 0) {
          structuresReceived.forEach(s => {
            // transform the Promise into an Observable using the from function
            // and then concatenate with the update operation
            obsList3.push(
              from(this.getElevation(new google.maps.LatLng(s.centro.coordinates[0], s.centro.coordinates[1]))).pipe(
                concatMap(a => {
                  s.centroAltitudine = a;
                  return this.structureService.putStructure(s)
                ),
              )
            )
          }
        }
        // execute the calls to Google in parallel and  (for each tenant)
        return forkJoin(obsList3)
      }),
      concatMap(() => this.storageService.logout())
    )
  });
});
concat(...obsList).subscribe();
person Picci    schedule 11.02.2021