Реализация на експоненциално забавяне с rxjs

Angular 7 docs предоставят този пример за практическо използване на rxjs Observables при прилагане на експоненциално забавяне за AJAX заявка:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

Въпреки че разбирам концепцията както за Observables, така и за backoff, не мога съвсем да разбера как точно retryWhen ще изчисли времевите интервали за повторно абониране за източника ajax.

По-конкретно, как zip, map и mapMerge работи в тази настройка?

И какво ще се съдържа в обекта attempts, когато бъде излъчен в retryWhen?

Прегледах техните референтни страници, но все още не мога да разбера това.


person CBlew    schedule 26.10.2018    source източник
comment
Rx е малко объркващо за научаване. retryWhen прави повторен опит по критерий - attempts е потокът от грешки. zip грабва следващия индекс от диапазона. map го умножава по себе си (това е експоненциалната част, тъй като x * x е просто x ** 2), mapMerge изчаква таймерът да приключи, преди да продължи следващият опит. Прехвърлянето му към ajax ще му каже да опита отново, когато ajax възникне грешка и след това ще се откаже експоненциално (когато потокът от грешки издаде грешка - компресирането продължава, картата се изпълнява и mergeMap изчаква таймера, преди да продължи).   -  person Benjamin Gruenbaum    schedule 26.10.2018
comment
И разбира се, можете просто да използвате fetch и да го внедрите без Rx, което би било ~5 LoC на обикновен JavaScript и for цикъл с try/catch :D   -  person Benjamin Gruenbaum    schedule 26.10.2018
comment
@BenjaminGruenbaum едно нещо, което не разбирам, е как zip работи тук ... защо не създава нов Observable от zip всеки път, когато се абонира отново?   -  person Explosion Pills    schedule 26.10.2018
comment
@ExplosionPills Операторът retryWhen се абонира само веднъж за наблюдаемото, което е съставено в него. Той подава всички грешки, получени от източника, към наблюдаемото, съставено в него - параметърът attempts към стрелковата функция е наблюдаем от грешки. По същество, range наблюдаемото "засява" zip с броя повторни опити, които трябва да се опитат.   -  person cartant    schedule 27.10.2018
comment
@BenjaminGruenbaum @cartant И така, операторът zip хваща range наблюдаемото (1, 2, 3) и го обединява с входа attempts? Но защо правя това? Премахнах zip от pipe и функцията изглежда работи по същия начин. И така, какъв беше смисълът от ципирането? Не можем ли просто да игнорираме attempts?   -  person CBlew    schedule 27.10.2018
comment
@CBlew опитите за наблюдение задействат елемент всеки път, когато http заявката е неуспешна.   -  person Benjamin Gruenbaum    schedule 27.10.2018
comment
@cartant Имам същия въпрос като CBlew... ако се абонира само веднъж за съставения наблюдаем, как така използването само на range само по себе си ще се абонира отново завинаги срещу подхода zip?   -  person Explosion Pills    schedule 29.10.2018


Отговори (1)


Прекарах доста време в проучване на това (с цел обучение) и ще се опитам да обясня работата на този код възможно най-подробно.

Първо, ето оригиналния код, пояснен:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}
  1. Достатъчно лесно, създаваме персонализиран оператор backoff от оператор retryWhen. Ще можем да приложим това по-късно във функцията pipe.
  2. В този контекст методът pipe връща потребителски оператор.
  3. Нашият персонализиран оператор ще бъде модифициран оператор retryWhen. Необходим е аргумент на функцията. Тази функция ще бъде извикана веднъж — по-специално, когато този retryWhen бъде срещнат/извикан за първи път. Между другото, retryWhen влиза в действие само когато наблюдаемият източник генерира грешка. След това предотвратява по-нататъшното разпространение на грешката и се абонира отново за източника. Ако източникът генерира резултат без грешка (независимо дали при първо абониране или при повторен опит), retryWhen се пропуска и не се включва.

    Няколко думи за attempts. Това е наблюдаемо. Не е източникът, който може да се наблюдава. Създаден е специално за retryWhen. Има едно и само едно използване: когато абонаментът (или повторният абонамент) за наблюдаемия източник доведе до грешка, attempts задейства next. Дадено ни е attempts и сме свободни да го използваме, за да реагираме по някакъв начин на всеки неуспешен опит за абониране за наблюдаемия източник.

    Така че това е, което ще направим.

    Първо създаваме range(1, maxTries), наблюдаема, която има цяло число за всеки повторен опит, който сме готови да извършим. range е готов да задейства всичките си номера веднага и там, но трябва да задържим конете му: имаме нужда от нов номер само когато се случи нов повторен опит. Ето защо ние...

  4. ... ципирайте го с attempts. Това означава, че съчетайте всяка излъчена стойност на attempts с една стойност на range.

    Не забравяйте, че функцията, в която се намираме в момента, ще бъде извикана само веднъж и по това време attempts ще е задействал next само веднъж — за първоначалния неуспешен абонамент. И така, в този момент нашите две компресирани наблюдаеми са произвели само една стойност.

    Между другото, какви са стойностите на двете наблюдаеми, компресирани в една? Тази функция решава, че: (i) => i. За по-голяма яснота може да се напише (itemFromRange, itemFromAttempts) => itemFromRange. Вторият аргумент не се използва, така че е изпуснат и първият е преименуван на i.

    Какво се случва тук, е просто да пренебрегнем стойностите, задействани от attempts, ние се интересуваме само от факта, че те са задействани. И всеки път, когато това се случи, изтегляме следващата стойност от range наблюдаема...

  5. ... и го на квадрат. Това е за експоненциалната част от експоненциалното забавяне.

    И така, сега, когато (повторното) абониране за източник е неуспешно, имаме непрекъснато нарастващо цяло число (1, 4, 9, 16...). Как да трансформираме това цяло число във времево забавяне до следващото повторно абониране?

    Не забравяйте, че тази функция, в която се намираме в момента, трябва да върне наблюдаема, използвайки attempts като вход. Тази получена наблюдаема се изгражда само веднъж. retryWhen след това се абонира за тази получена наблюдаема и: прави повторен опит да се абонира за източника наблюдаема всеки път, когато получената наблюдаема се задейства next; извиква complete или error на източник observable, когато резултатът observable задейства тези съответни събития.

  6. Накратко, трябва да накараме retryWhen да изчака малко. Може да се използва оператор delay, но настройването на експоненциално нарастване на забавянето вероятно ще бъде болезнено. Вместо това влиза в действие операторът mergeMap.

    mergeMap е пряк път за комбиниране на два оператора: map и mergeAll. map просто преобразува всяко нарастващо цяло число (1, 4, 9, 16...) в timer наблюдаем, който задейства next след изминал брой милисекунди. mergeAll принуждава retryWhen действително да се абонира за timer. Ако последният бит не се случи, нашата получена наблюдаема просто ще задейства next незабавно с timer наблюдаема инстанция като стойност.

  7. На този етап създадохме нашия персонализиран наблюдаем, който ще се използва от retryWhen, за да реши кога точно да се опита да се абонира отново за източника наблюдаем.

В настоящия момент виждам два проблема с тази реализация:

  • Веднага след като нашата получена наблюдаема задейства последния си next (причинявайки последния опит за повторно абониране), тя също незабавно задейства complete. Освен ако наблюдаемият източник не върне резултат много бързо (приемайки, че последният повторен опит ще бъде успешният), този резултат ще бъде игнориран.

    Това е така, защото веднага щом retryWhen чуе complete от нашия наблюдаем, той извиква complete на източника, който все още може да е в процес на правене на AJAX заявка.

  • Ако всички повторни опити са били неуспешни, източникът всъщност извиква complete вместо по-логично error.

За да разрешим и двата проблема, мисля, че нашата получена наблюдаема трябва да задейства error в самия край, след като даде на последния повторен опит известно време, за да се опита да свърши работата си.

Ето моето внедряване на споменатата корекция, което също взема предвид оттеглянето на оператора zip в последния rxjs v6:

import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";

function backoffImproved(maxTries, ms) {
    return pipe(
        retryWhen(attempts => {
            const observableForRetries =
                zip(range(1, maxTries), attempts)
                    .pipe(
                        map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                        map(i => i * i),
                        switchMap(i => timer(i * ms))
                    );
            const observableForFailure =
                throwError(new Error('Could not complete AJAX request'))
                    .pipe(
                        materialize(),
                        delay(1000),
                        dematerialize()
                    );
            return concat(observableForRetries, observableForFailure);
        })
    );
}

Тествах този код и изглежда, че работи правилно във всички случаи. Не мога да се притеснявам да го обясня подробно точно сега; Съмнявам се, че някой дори ще прочете стената с текст по-горе.

Както и да е, големи благодарности на @BenjaminGruenbaum и @cartant, че ме насочиха на правилния път, за да обмисля всичко това.

person CBlew    schedule 27.10.2018
comment
Това изглежда прекалено сложно. Може да искате да разгледате изпълнението на Alex: github.com/alex-okrushko/backoff-rxjs - person cartant; 29.10.2018
comment
Всъщност прочетох стената от текст - много интересен случай за [de]materialize. Ще видя дали мога да събера грешки от всички повторни опити и да ги издам в изхода на retryWhen - person user776686; 06.04.2021