AngularInDepth се отдалечава от Medium. По-новите статии се хостват на новата платформа inDepth.dev. Благодарим ви, че сте част от дълбокото движение!

За повечето разработчици първият контакт с RxJS се установява от библиотеки, като Angular. Някои функции връщат потоци и за да ги използват, фокусът естествено е върху операторите.

В даден момент смесването на реактивен и част от нереактивния код изглежда практично. Тогава хората се интересуват сами да създават потоци. Всеки път, когато имате работа с асинхронен код или обработка на данни, има вероятност потоците да са добър вариант.

RxJS предлага множество начини за създаване на потоци. Независимо от ситуацията, пред която сте изправени, има един идеален начин да създадете поток. Може да не са ви необходими всички, но познаването им може да ви спести време и малко код.

Сложих всички възможни опции в четири категории въз основа на основната им цел:

  • Поточно предаване на съществуващи данни
  • Генериране на данни
  • Взаимодействайте със съществуващите API
  • Комбинирайте и изберете от съществуващи потоци

Забележка: Примерите използват RxJS 6 и могат да се различават от по-старите версии. Нещо, което със сигурност е различно, е начинът, по който импортирате функциите.

RxJS 6

import {of, from} from 'rxjs';
of(...);
from(...);

RxJS ‹ 6

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/from';
Observable.of(...);
Observable.from(...);
//or
import { of } from 'rxjs/observable/of';
import { from } from 'rxjs/observable/from';
of(...);
from(...);

Бележка относно диаграмите на потока:

  • | означава, че потокът завършва
  • X означава, че потокът завършва с грешка
  • … означава, че потокът продължава безкрайно

Поточно предаване на съществуващи данни

Имате някои данни и искате да ги подадете в поток. Има три разновидности, всички от които също ви позволяват да предоставите планировчик като последен аргумент (Ако искате да научите повече за планировчиците, можете да погледнете моята предишна статия). Всички получени потоци ще бъдат студени.

of

Използвайте of, ако имате само един елемент или няколко отделни.

of(1,2,3)
  .subscribe();
// Produces
// 1 2 3 |

от

Използвайте from, ако имате масив или Iterable и искате всички елементи в него да бъдат излъчени към потока. Можете също да го използвате, за да конвертирате обещание в наблюдаемо.

const foo = [1,2,3];
from(foo)
  .subscribe();
// Produces
// 1 2 3 |

двойки

Потоци двойки ключ/стойност на обект. Особено полезно, ако обектът представлява речник.

const foo = { a: 1, b: 2};
pairs(foo)
  .subscribe();
// Produces
// [a,1] [b,2] |

Какво ще кажете за други структури от данни?

Може би вашите данни се съхраняват в персонализирана структура, която не прилага протокола Iterable или имате рекурсивна, дървовидна структура. В тези случаи една от следните опции може да е подходяща:

  • Първо извлечете данни в масив
  • Използвайте функцията generate от следващия раздел, за да обходите данните
  • Създайте персонализиран поток (вижте този раздел)
  • Създайте итератор

Опции 2 и 3 са обяснени по-късно, така че се фокусирам върху създаването на итератор тук. Можем да създадем поток от iterable чрез извикване на from. Iterable е обект, който може да достави итератор (вижте тази mdn статия, ако се интересувате от подробности).

Един прост начин за създаване на итератор е „генераторна функция“. Когато извикате генераторна функция, тя връща обект, който съответства както на протокола iterable, така и на протокола iterator.

//Our custom data structure
class List {
  add(element) ...
  get(index) ...
  get size() ...
  ...
}
function* listIterator(list) {
  for (let i = 0; i<list.size; i++) {
    yield list.get(i);
  }
}
const myList = new List();
myList.add(1);
myList.add(3);
from(listIterator(myList))
  .subscribe(console.log);
// Produces
// 1 3 |    

Когато извикаме функцията listIterator се връща iterable / iterator. Кодът във функцията не се изпълнява, преди да извикаме subscribe.

Генериране на данни

Знаете какви данни да излъчвате, но искате/трябва да ги генерирате в движение. Всички функции приемат планировчик като последен аргумент. Те произвеждат студени потоци.

диапазон

Издава последователни числа, започващи с начална стойност и продължаващи за определен брой повторения.

range(10, 2) //start with 10 and emit two values
  .subscribe();
// Produces
// 10 11 |

интервал/таймер

Подобно донякъде на диапазон, но излъчва нарастващи числа периодично (т.е. не наведнъж). Разликата между двете е, че таймер ви позволява да дефинирате забавяне за първия елемент. Освен това ви позволява да генерирате само една стойност, като не посочвате период.

interval(1000) //emit every 1000ms = 1 second
  .subscribe()
// Produces
// 0  1  2  3  4 ...
timer(5000, 1000) //the same as above but waits 5000ms before it starts
timer(5000)
.subscribe(i => console.log("foo");
//prints foo after 5 seconds

По-голямата част от интервала от време ще се използва за периодична обработка на данни:

interval(10000).pipe(
  flatMap(i => fetch("https://server/stockTicker")
).subscribe(updateChart)

Това ще получава нови данни на всеки 10 секунди и ще актуализира екрана.

генерирам

По-сложна функция, която ви позволява да излъчвате последователност от всякакъв тип. Има някои претоварвания и ви показвам най-интересното(ите):

generate(
  0,           // start with this value
  x => x < 10, // condition: emit as long as a value is less than 10
  x => x*2     // iteration: double the previous value
).subscribe();
// Produces
// 1 2 4 8 |

Можете също да го използвате за итериране на стойности, ако структурата не имплементира интерфейса Iterable. Нека опитаме това с нашия пример за списък от преди:

const myList = new List();
myList.add(1);
myList.add(3);
generate(
  0,                  // start with this value
  i => i < list.size, // condition: emit until we have processed the whole list
  i => ++i,           // iteration: get next index
  i => list.get(i)    // selection: get value from list 
).subscribe();
// Produces
// 1 3 |

Както можете да видите, добавих още един аргумент: Селекторът. Работи като оператора map и преобразува генерираната стойност в нещо по-полезно.

Празни потоци

Понякога трябва да подадете или върнете поток, който не излъчва никакви данни. Има три функции, по една за всяка възможна ситуация. Можете да подадете планировчик към всички функции. empty и throwError приемат планировчик като аргумент.

празен

Създава поток, който завършва без излъчване на стойност.

empty()
  .subscribe();
// Produces
// |

никога

Създава поток, който никога не завършва, но и никога не излъчва нищо.

never()
  .subscribe();
// Produces
// ...

throwError

Създава поток, който се проваля, без да излъчва стойност.

throwError('error')
  .subscribe();
// Produces
// X

Свържете се със съществуващи API

Не всички библиотеки и целият ви наследен код използват или поддържат потоци. За щастие RxJS предоставя функции за свързване на нереактивен и реактивен код. Този раздел обсъжда само модели, предоставени от RxJS точно за тази цел.

Може да се заинтересувате и от тази обширна статия от Бен Леш, обхващаща вероятно всеки възможен начин за взаимодействие с обещания.

от

Вече имахме това и го изброявам и тук, защото може да се използва за обвиване на обещание в наблюдаем.

from(new Promise(resolve => resolve(1)))
  .subscribe();
// Produces
// 1 |

fromEvent

Добавя слушател на събития към DOM елемент и съм почти сигурен, че знаете това. Това, което може би не знаете е, че можете да го използвате и с други видове, напр. jQuery обект.

const element = $('#fooButton'); // creates a jQuery object for a DOM element
from(element, 'click')
  .subscribe();
// Produces
// clickEvent ...

fromEventPattern

За да разберем защо имаме нужда от това, ако вече имаме fromEvent, трябва да разберем как работи fromEvent. Вземете този код:

from(document, 'click')
  .subscribe();

Той казва на RxJS, че искаме да слушаме събития за щракване от документа. По време на абонамент RxJS открива, че документът е от тип EventTarget, следователно може да извика addEventListener върху него. Ако подадем jQuery обект вместо документ, тогава RxJS знае, че вместо това трябва да извика on.

Този пример с използване на fromEventPattern основно прави същото като fromEvent:

function addClickHandler(handler) {
  document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}
fromEventPattern(
  addClickHandler,
  removeClickHandler,
)
.subscribe(console.log);
//is equivalent to
fromEvent(document, 'click')

Самият RxJS създава действителния слушател (манипулатор) и вашата работа е да го добавяте и премахвате. Целта на fromEventPattern е основно да каже на RxJS как да регистрира и премахва слушатели на събития.

Сега си представете, че използвате библиотека, в която трябва да извикате метод с име registerListener. Вече не можем да използваме fromEvent, защото не знае как да се справи с него.

const listeners = [];
class Foo {
  registerListener(listener) {
    listeners.push(listener);
  }
  emit(value) {
    listeners.forEach(listener => listener(value));
  }
}
const foo = new Foo();
fromEventPattern(listener => foo.registerListener(listener))
  .subscribe();
foo.emit(1);
// Produces
// 1 ...

Когато извикаме foo.emit(1), се извиква слушателят от RxJS и той може да излъчи стойността към потока.

Можете също така да го използвате, за да слушате повече от един тип събитие или да се свържете с всеки API, който комуникира чрез обратни извиквания, напр. API на WebWorker:

const myWorker = new Worker('worker.js');
fromEventPattern(
  handler => { myWorker.onmessage = handler },
  handler => { myWorker.onmessage = undefined }
)
.subscribe();
// Produces
// workerMessage ...

bindCallback

Това е подобно на fromEventPattern, но е предназначено само за единични стойности. Това означава, че потокът завършва след извикване на обратното извикване. Използването също е различно – обгръщате функцията с bindCallback, след което тя магически връща поток, когато бъде извикана:

function foo(value, callback) {
  callback(value);
}
// without streams
foo(1, console.log); //prints 1 in the console
// with streams
const reactiveFoo = bindCallback(foo); 
//when we call reactiveFoo it returns an observable
reactiveFoo(1)
  .subscribe(console.log); //prints 1 in the console
// Produces
// 1 |

websocket

Да, можете действително да създадете websocket връзка и да я изложите като поток:

import { webSocket } from 'rxjs/webSocket'; 
let socket$ = webSocket('ws://localhost:8081');
//receive messages
socket$.subscribe(
  (msg) => console.log('message received: ' + msg),
  (err) => console.log(err),
  () => console.log('complete') * );
//send message
socket$.next(JSON.stringify({ op: 'hello' }));

Наистина е толкова лесно да добавите поддръжка на websocket към вашето приложение. websocket създава тема. Това означава, че можете както да се абонирате за него, за да получавате съобщения, така и да изпращате съобщения чрез него, като се обадите на next.

ajax

Само да знаете: Подобно на websocket и предлага поддръжка за AJAX заявки. Вероятно така или иначе използвате библиотека или рамка с вградена поддръжка на AJAX. И ако не го направите, препоръчвам вместо това да използвате fetch (и polyfill, ако е необходимо) и да обвиете върнатото обещание в наблюдаем (вижте също функцията defer по-долу).

Персонализирани потоци

Понякога вече представените функции не са достатъчно гъвкави. Или имате нужда от повече контрол върху абонаментите.

Предмет

Субектът е специален обект, който ви позволява да излъчвате данни към потока и да го контролирате. Самият обект също е наблюдаем, но ако искате да изложите потока на друг код, се препоръчва да използвате метода asObservable. По този начин не можете случайно да извикате методите на източника.

const subject = new Subject();
const observable = subject.asObservable();
observable.subscribe();
subject.next(1);
subject.next(2);
subject.complete();
// Produces
// 1 2 |

Имайте предвид, че стойностите, излъчени преди абонаменти, се „загубват“:

const subject = new Subject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// Prints
// 2

В допълнение към обичайната тема RxJS предоставя три специализирани версии.

AsyncSubject излъчва само последната стойност след завършване.

const subject = new AsyncSubject();
const observable = subject.asObservable();
observable.subscribe(console.log);
subject.next(1);
subject.next(2);
subject.complete();
// Prints
// 2

BehaviorSubject ви позволява да предоставите стойност (по подразбиране), която ще бъде излъчена на всеки абонат, ако досега не е излъчена друга стойност. В противен случай абонатите получават последната излъчена стойност.

const subject = new BehaviorSubject(1);
const observable = subject.asObservable();
const subscription1 = observable.subscribe(console.log);
subject.next(2);
subscription1.unsubscribe();
// Prints
// 1
// 2
const subscription2 = observable.subscribe(console.log);
// Prints
// 2

ReplaySubject съхранява всички излъчени стойности до определен брой, време или безкрайно. След това всички нови абонати ще получат всички съхранени стойности.

const subject = new ReplaySubject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// Prints
// 1
// 2

Можете да намерите повече информация по темите в документацията на ReactiveX (която предлага и допълнителни връзки). Бен Леш предлага някои прозрения по теми в „По темата за темите“, както и Никълъс Джеймисън „в RxJS: Разбиране на субектите“.

Наблюдаемо

Можете да създадете наблюдаем, като просто използвате оператора new. С функцията, която предавате, можете да контролирате потока. Тази функция се извиква всеки път, когато някой се абонира и получава наблюдател, който можете да използвате като тема, т.е. извикване следващо, пълно и грешка.

Нека преразгледаме нашия пример за списък:

const myList = new List();
myList.add(1);
myList.add(3);
new Observable(observer => {
  for (let i = 0; i<list.size; i++) {
    observer.next(list.get(i));
  }
  observer.complete();
})
.subscribe();
// Produces
// 1 3 |

Функцията може да върне функция за отписване, която се извиква, когато абонатът анулира абонамента. Можете да го използвате, за да почистите или да изпълните някакво довършително действие.

new Observable(observer => {
  //stream it, baby!
  return () => {
                 //clean up
               };
})
.subscribe();

Подклас Observable

Преди появата на lettable операторите това беше начин за внедряване на персонализирани оператори. RxJS разширява Observable вътрешно. Един пример е Subject, друг е операторът publish. Той връща ConnectableObservable, който предоставя допълнителния метод connect.

Внедряване на абонамент

Понякога вече имате обект, който поддържа състояние и може да излъчва стойности. Можете да го превърнете в наблюдаем, ако внедрите интерфейса Subscribable, който се състои само от метод за абониране.

interface Subscribable<T> {
  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable
}

Комбинирайте и изберете съществуващи потоци

Знанието как да създавате индивидуални потоци не е достатъчно. Понякога се сблъсквате с няколко потока, но имате нужда само от един. Някои от функциите са достъпни и като оператори, затова няма да се задълбочавам тук. Мога да препоръчам статия от Max NgWizard K, която дори съдържа някои фантастични анимации.

Още една препоръка: Можете да играете интерактивно с комбинирани оператори на RxMarbles чрез плъзгане на елементи.

Типът ObservableInput

Операторите и функциите, които очакват поток (или масив от потоци), обикновено не работят само с наблюдаеми. Вместо това те всъщност очакват аргументът да бъде от типа ObservableInput, който е дефиниран по следния начин:

type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;

Това означава, че можете напр. предавайте обещания или масиви, без да е необходимо първо да ги конвертирате в наблюдаеми!

отлагам

Основната цел е да се отложи създаването на наблюдаем до момента, когато някой иска да се абонира. Това е полезно, ако

  • създаването на наблюдаемото е изчислително скъпо
  • искате нов наблюдаем за всеки абонат
  • искате да избирате между различни наблюдаеми по време на абонамента
  • някакъв код не трябва да се изпълнява преди абонамент

Последната точка включва един не толкова очевиден случай на употреба: обещания (defer може също да върне обещание). Вземете този пример с помощта на API за извличане:

function getUser(id) {
  console.log("fetching data");
  return fetch(`https://server/user/${id}`);
}
const userPromise = getUser(1);
console.log("I don't want that request now");
//somewhere else
userPromise.then(response => console.log("done");
// Prints
// fetching data
// I don't want that request now
// done

Обещанията се изпълняват незабавно, докато потоците се изпълняват, когато се абонирате. В момента, в който извикаме getUser, се изпраща заявка, дори ако не сме искали това в този момент. Разбира се, можем да използваме от, за да конвертираме обещание в наблюдаемо, но обещанието, което предаваме, вече е създадено/изпълнено. defer ни позволява да изчакаме до абонамента:

const user$ = defer(() => getUser(1));
console.log("I don't want that request now");
//somewhere else
user$.subscribe(response => console.log("done");
// Prints
// I don't want that request now
// fetching data
// done

iif

iif покрива специален случай на употреба на отлагане: Вземане на решение между два потока по време на абонамента:

iif(
  () => new Date().getHours() < 12,
  of("AM"),
  of("PM")
)
.subscribe();
// Produces
// AM before noon, PM afterwards

За да цитирам документацията:

Всъщност iif може лесно да се имплементира с defer и съществува само за удобство и четливост.

onErrorResumeNext

Стартира първия поток и ако не успее, продължава със следващия поток. Грешката се игнорира.

const stream1$ = of(1, 2).pipe(
  tap(i => { if(i>1) throw 'error'}) //fail after first element
);
const stream2$ = of(3,4);
onErrorResumeNext(stream1$, stream2$)
  .subscribe(console.log);
// Produces
// 1 3 4 |

Това може да бъде полезно, ако имате повече от една уеб услуга. В случай, че основният не успее, резервната услуга може да бъде извикана автоматично.

forkJoin

Позволява на потоците да се изпълняват едновременно и излъчва последните им стойности в масив, когато бъдат завършени. Тъй като се излъчват само последните стойности на всеки поток, той обикновено се използва за потоци, които излъчват само един елемент, като HTTP заявки. Искате заявките да се изпълняват паралелно и да правите нещо, когато всички имат отговори.

function handleResponses([user, account]) {
  // do something
}
forkJoin(
  fetch("https://server/user/1"),
  fetch("https://server/account/1")
)
.subscribe(handleResponses);

сливане / съединяване

Излъчва всяка стойност, която се излъчва от една от наблюдаемите източници.

merge поддържа параметър, който ви позволява да определите за колко изходни потоци са абонирани едновременно. По подразбиране е неограничен. Стойност 1 би означавала слушане на един изходен поток и когато той приключи, абониране за следващия. Тъй като това е много често срещан сценарий, RxJS предоставя изрична функция: concat.

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2)),
  timer(0, 1000).pipe(mapTo("Stream 3"), take(2)),
  2 //two concurrent streams
)
.subscribe();
// Subscribes to stream 1 and 2 only
// prints
// Stream 1 -> after 1000ms
// Stream 2 -> after 1200ms
// Stream 1 -> after 2000ms
// Stream 1 has completed, now subscribe to stream 3
// prints
// Stream 3 -> after 0 ms
// Stream 2 -> after 400 ms (2400ms from beginning)
// Stream 3 -> after 1000ms

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
  1
)
// is equal to
concat(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
)
// prints
// Stream 1 -> after 1000ms
// Stream 1 -> after 2000ms
// Stream 2 -> after 3200ms
// Stream 2 -> after 4400ms

zip / combineLatest

Докато merge и concat излъчват всички стойности от изходните потоци поотделно, zip и combineLatest комбинират една стойност от всеки изходен поток и ги излъчват заедно. zip комбинира първите стойности, излъчвани от всички(!) изходни потоци, вторите стойности и т.н. Това е полезно, ако съдържанието на потоците е свързано.

zip(
  interval(1000),
  interval(1200),
)
.subscribe();
// Produces
// [0, 0] [1, 1] [2, 2] ...

combineLatest е подобен, но комбинира най-новите стойности, излъчвани от изходните потоци. Нищо не се случва, докато всички изходни потоци не излъчат поне една стойност. От този момент нататък всеки път, когато изходен поток излъчва стойност, тя се комбинира с последните стойности на другите потоци.

combineLatest(
  interval(1000),
  interval(1200),
)
.subscribe();
// Produces
// [0, 0] [1, 0] [1, 1] [2, 1] ...

И двете функции ви позволяват да подадете селекторна функция, която може да комбинира елементите в нещо различно от масив:

zip(
  interval(1000),
  interval(1200),
  (e1, e2) -> e1 + e2
)
.subscribe();
// Produces
// 0 2 4 6 ...

раса

Избира се първият поток, който излъчва стойност. Така че полученият поток по същество е най-бързият поток.

race(
  interval(1000),
  of("foo")
)
.subscribe();
// Produces
// foo |

Тъй като of произвежда стойност незабавно, по-бързият поток и потокът се избират.

Заключение

Това са много начини за създаване на наблюдаеми. Познаването им е от съществено значение, ако искате да създадете реактивни API или искате да комбинирате наследени API с реактивни.

Представих ви всички опции, но може да се каже много повече за всички тях. Ако искате да се гмурнете по-дълбоко, силно препоръчвам да се консултирате с документацията или да прочетете предложените статии.

Друг интересен начин да получите представа е RxViz. Пишете RxJS код и получените потоци след това се показват графично и анимирани.