AngularInDepth уходит от Medium. Более свежие статьи размещаются на новой платформе inDepth.dev. Спасибо за то, что участвуете в глубоком движении!

Для большинства разработчиков первый контакт с RxJS устанавливается библиотеками, такими как Angular. Некоторые функции возвращают потоки, и для их использования основное внимание, естественно, уделяется операторам.

В какой-то момент смешение реактивного и некоторого нереактивного кода кажется практичным. Тогда люди сами начинают интересоваться созданием стримов. Всякий раз, когда вы имеете дело с асинхронным кодом или обработкой данных, скорее всего, потоки - хороший вариант.

RxJS предлагает множество способов создания потоков. В какой бы ситуации вы ни оказались, есть один идеальный способ создать поток. Возможно, они вам не понадобятся, но их знание поможет сэкономить время и немного кода.

Я разделил все возможные варианты на четыре категории в зависимости от их основного предназначения:

  • Потоковая передача существующих данных
  • Генерировать данные
  • Взаимодействовать с существующими API
  • Объедините и выберите из существующих потоков

Примечание. В примерах используется RxJS 6, и они могут отличаться от более старых версий. Что-то, что наверняка отличается, - это способ импорта функций.

RxJS 6

import {of, from} from 'rxjs';
of(...);
from(...);

RxJS ‹6

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/from';
Observable.of(...);
Observable.from(...);
//or
import { of } from 'rxjs/observable/of';
import { from } from 'rxjs/observable/from';
of(...);
from(...);

Примечание к диаграммам потоков:

  • | означает, что поток завершен
  • X означает, что поток завершается с ошибкой
  • … Означает, что поток продолжается бесконечно

Потоковая передача существующих данных

У вас есть данные, и вы хотите передать их в поток. Существует три разновидности, каждая из которых также позволяет вам предоставить планировщик в качестве последнего аргумента (если вы хотите узнать больше о планировщиках, вы можете взглянуть на мою предыдущую статью). Все полученные потоки будут холодными.

of

Используйте of, если у вас только один элемент или несколько отдельных.

of(1,2,3)
  .subscribe();
// Produces
// 1 2 3 |

из

Используйте from, если у вас есть массив или Iterable и вы хотите, чтобы все элементы в нем передавались в поток. Вы также можете использовать его для преобразования обещания в наблюдаемое.

const foo = [1,2,3];
from(foo)
  .subscribe();
// Produces
// 1 2 3 |

пары

Потоковая передача пар ключ / значение объекта. Особенно полезно, если объект представляет собой словарь.

const foo = { a: 1, b: 2};
pairs(foo)
  .subscribe();
// Produces
// [a,1] [b,2] |

А как насчет других структур данных?

Возможно, ваши данные хранятся в специальной структуре, которая не реализует протокол Iterable, или у вас есть рекурсивная древовидная структура. В таких случаях может подойти один из следующих вариантов:

  • Сначала извлеките данные в массив
  • Используйте функцию generate из следующего раздела, чтобы перебирать данные.
  • Создайте собственный поток (см. Этот раздел)
  • Создать итератор

Варианты 2 и 3 объясняются позже, поэтому я сосредоточусь на создании итератора здесь. Мы можем создать поток из итеративного, вызвав from. iterable - это объект, который может доставить итератор (см. Эту статью mdn, если вас интересуют подробности).

Один из простых способов создать итератор - это функция-генератор. Когда вы вызываете функцию генератора, она возвращает объект, который соответствует как протоколу iterable, так и протоколу iterator.

//Our custom data structure
class List {
  add(element) ...
  get(index) ...
  get size() ...
  ...
}
function* listIterator(list) {
  for (let i = 0; i<list.size; i++) {
    yield list.get(i);
  }
}
const myList = new List();
myList.add(1);
myList.add(3);
from(listIterator(myList))
  .subscribe(console.log);
// Produces
// 1 3 |    

Когда мы вызываем функцию listIterator, возвращается итератор / итератор. Код внутри функции не выполняется до вызова subscribe.

Генерировать данные

Вы знаете, какие данные выдавать, но хотите / должны генерировать их на лету. Все функции принимают планировщик в качестве последнего аргумента. Они производят холодные струи.

диапазон

Выдает последовательные числа, начинающиеся с начального значения и продолжающиеся определенное количество итераций.

range(10, 2) //start with 10 and emit two values
  .subscribe();
// Produces
// 10 11 |

интервал / таймер

В чем-то похоже на range, но периодически (т.е. не сразу) выдает возрастающие числа. Разница между ними в том, что timer позволяет вам определять задержку для первого элемента. Это также позволяет создавать только одно значение, не указывая период.

interval(1000) //emit every 1000ms = 1 second
  .subscribe()
// Produces
// 0  1  2  3  4 ...
timer(5000, 1000) //the same as above but waits 5000ms before it starts
timer(5000)
.subscribe(i => console.log("foo");
//prints foo after 5 seconds

Большая часть временного интервала будет использоваться для периодической обработки данных:

interval(10000).pipe(
  flatMap(i => fetch("https://server/stockTicker")
).subscribe(updateChart)

Это будет получать новые данные каждые 10 секунд и обновлять экран.

генерировать

Более сложная функция, позволяющая генерировать последовательность любого типа. В нем есть несколько перегрузок, и я покажу вам наиболее интересные из них:

generate(
  0,           // start with this value
  x => x < 10, // condition: emit as long as a value is less than 10
  x => x*2     // iteration: double the previous value
).subscribe();
// Produces
// 1 2 4 8 |

Вы также можете использовать его для перебора значений, если структура не реализует интерфейс Iterable. Давайте попробуем это с помощью нашего предыдущего примера со списком:

const myList = new List();
myList.add(1);
myList.add(3);
generate(
  0,                  // start with this value
  i => i < list.size, // condition: emit until we have processed the whole list
  i => ++i,           // iteration: get next index
  i => list.get(i)    // selection: get value from list 
).subscribe();
// Produces
// 1 3 |

Как видите, я добавил еще один аргумент: селектор. Он работает как оператор map и преобразует сгенерированное значение в нечто более полезное.

Пустые потоки

Иногда вам нужно передать или вернуть поток, который не передает никаких данных. Есть три функции, по одной для каждой возможной ситуации. Вы можете передать планировщик всем функциям. empty и throwError принимают планировщик в качестве аргумента.

пустой

Создает поток, который завершается без выдачи значения.

empty()
  .subscribe();
// Produces
// |

никогда

Создает поток, который никогда не завершается, но и никогда ничего не испускает.

never()
  .subscribe();
// Produces
// ...

throwError

Создает поток, который завершается ошибкой без выдачи значения.

throwError('error')
  .subscribe();
// Produces
// X

Подключитесь к существующим API

Не все библиотеки и весь ваш устаревший код используют или поддерживают потоки. К счастью, RxJS предоставляет функции для объединения нереактивного и реактивного кода. В этом разделе обсуждаются только шаблоны, предоставленные RxJS именно для этой цели.

Вас также может заинтересовать эта обширная статья от Бена Леша, в которой описаны все возможные способы взаимодействия с обещаниями.

из

У нас уже было это, и я тоже перечисляю его здесь, потому что его можно использовать для обертывания обещания в наблюдаемое.

from(new Promise(resolve => resolve(1)))
  .subscribe();
// Produces
// 1 |

fromEvent

Добавляет прослушиватель событий к элементу DOM, и я уверен, что вы это знаете. Возможно, вы не знаете, что вы также можете использовать его с другими типами, например объект jQuery.

const element = $('#fooButton'); // creates a jQuery object for a DOM element
from(element, 'click')
  .subscribe();
// Produces
// clickEvent ...

fromEventPattern

Чтобы понять, зачем нам это, если у нас уже есть fromEvent, нам нужно понять, как работает fromEvent. Возьмите этот код:

from(document, 'click')
  .subscribe();

Он сообщает RxJS, что мы хотим прослушивать события щелчка из документа. Во время подписки RxJS обнаруживает, что документ относится к типу EventTarget, поэтому он может вызвать для него addEventListener. Если мы передаем объект jQuery вместо документа, тогда RxJS знает, что вместо этого он должен вызвать on.

Этот пример с использованием fromEventPattern в основном делает то же самое, что и fromEvent:

function addClickHandler(handler) {
  document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}
fromEventPattern(
  addClickHandler,
  removeClickHandler,
)
.subscribe(console.log);
//is equivalent to
fromEvent(document, 'click')

Сам RxJS создает фактический слушатель (обработчик), и ваша задача - добавлять и удалять его. Цель fromEventPattern - сообщить RxJS, как регистрировать и удалять прослушиватели событий.

Теперь представьте, что вы используете библиотеку, в которой вам нужно вызвать метод с именем registerListener. Мы больше не можем использовать fromEvent, потому что он не знает, как с этим бороться.

const listeners = [];
class Foo {
  registerListener(listener) {
    listeners.push(listener);
  }
  emit(value) {
    listeners.forEach(listener => listener(value));
  }
}
const foo = new Foo();
fromEventPattern(listener => foo.registerListener(listener))
  .subscribe();
foo.emit(1);
// Produces
// 1 ...

Когда мы вызываем foo.emit (1), вызывается слушатель из RxJS, который может передать значение потоку.

Вы также можете использовать его для прослушивания более одного типа событий или подключения к любому API, который обменивается данными через обратные вызовы, например. API WebWorker:

const myWorker = new Worker('worker.js');
fromEventPattern(
  handler => { myWorker.onmessage = handler },
  handler => { myWorker.onmessage = undefined }
)
.subscribe();
// Produces
// workerMessage ...

bindCallback

Это похоже на fromEventPattern, но предназначено только для отдельных значений. То есть поток завершается после вызова обратного вызова. Использование также отличается - вы обертываете функцию с помощью bindCallback, а затем она волшебным образом возвращает поток при его вызове:

function foo(value, callback) {
  callback(value);
}
// without streams
foo(1, console.log); //prints 1 in the console
// with streams
const reactiveFoo = bindCallback(foo); 
//when we call reactiveFoo it returns an observable
reactiveFoo(1)
  .subscribe(console.log); //prints 1 in the console
// Produces
// 1 |

веб-сокет

Да, вы действительно можете создать соединение с веб-сокетом и предоставить его как поток:

import { webSocket } from 'rxjs/webSocket'; 
let socket$ = webSocket('ws://localhost:8081');
//receive messages
socket$.subscribe(
  (msg) => console.log('message received: ' + msg),
  (err) => console.log(err),
  () => console.log('complete') * );
//send message
socket$.next(JSON.stringify({ op: 'hello' }));

Добавить поддержку веб-сокетов в ваше приложение действительно так просто. websocket создает тему. Это означает, что вы можете как подписаться на него, чтобы получать сообщения, так и отправлять сообщения через него, вызывая next.

ajax

Просто чтобы вы это знали: похоже на websocket и предлагает поддержку запросов AJAX. В любом случае вы, вероятно, используете библиотеку или фреймворк со встроенной поддержкой AJAX. А если вы этого не сделаете, я рекомендую вместо этого использовать выборку (и, при необходимости, полифил) и обернуть возвращенное обещание в наблюдаемое (см. Также функцию defer ниже).

Пользовательские потоки

Иногда уже представленные функции недостаточно гибкие. Или вам нужно больше контроля над подписками.

Тема

Субъект - это особый объект, который позволяет передавать данные в поток и управлять им. Сам объект также является наблюдаемым, но если вы хотите предоставить поток другому коду, рекомендуется использовать метод asObservable. Таким образом, вы не сможете случайно вызвать исходные методы.

const subject = new Subject();
const observable = subject.asObservable();
observable.subscribe();
subject.next(1);
subject.next(2);
subject.complete();
// Produces
// 1 2 |

Обратите внимание, что значения, переданные до подписки, «теряются»:

const subject = new Subject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// Prints
// 2

В дополнение к обычной теме RxJS предоставляет три специализированных версии.

AsyncSubject выдает только последнее значение после завершения.

const subject = new AsyncSubject();
const observable = subject.asObservable();
observable.subscribe(console.log);
subject.next(1);
subject.next(2);
subject.complete();
// Prints
// 2

BehaviorSubject позволяет указать значение (по умолчанию), которое будет отправлено каждому подписчику, если до сих пор не было отправлено другое значение. В противном случае подписчики получают последнее переданное значение.

const subject = new BehaviorSubject(1);
const observable = subject.asObservable();
const subscription1 = observable.subscribe(console.log);
subject.next(2);
subscription1.unsubscribe();
// Prints
// 1
// 2
const subscription2 = observable.subscribe(console.log);
// Prints
// 2

ReplaySubject хранит все передаваемые значения до определенного числа, времени или до бесконечности. Все новые подписчики получат все сохраненные значения.

const subject = new ReplaySubject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// Prints
// 1
// 2

Вы можете найти больше информации по темам в документации ReactiveX (там также есть дополнительные ссылки). Бен Леш предлагает некоторые идеи по предметам в О предмете предметов, как и Николас Джеймисон в RxJS: Понимание предметов.

Наблюдаемый

Вы можете создать наблюдаемое, просто используя оператор new. С помощью функции, которую вы передаете, вы можете управлять потоком. Эта функция вызывается всякий раз, когда кто-то подписывается, и она получает наблюдателя, которого вы можете использовать как субъект, то есть вызывать next, complete и error.

Вернемся к нашему примеру со списком:

const myList = new List();
myList.add(1);
myList.add(3);
new Observable(observer => {
  for (let i = 0; i<list.size; i++) {
    observer.next(list.get(i));
  }
  observer.complete();
})
.subscribe();
// Produces
// 1 3 |

Функция может возвращать функцию отказа от подписки, которая вызывается, когда подписчик отменяет подписку. Вы можете использовать его для очистки или выполнения некоторых завершающих действий.

new Observable(observer => {
  //stream it, baby!
  return () => {
                 //clean up
               };
})
.subscribe();

Подкласс Observable

До появления операторов lettable это был способ реализации пользовательских операторов. RxJS расширяет Observable внутри. Один из примеров - Тема, другой - оператор публикации. Он возвращает ConnectableObservable, который предоставляет дополнительный метод connect.

Реализовать подписку

Иногда у вас уже есть объект, который хранит состояние и может выдавать значения. Вы можете превратить его в наблюдаемое, если вы реализуете интерфейс Subscribable, который состоит только из метода подписки.

interface Subscribable<T> {
  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable
}

Объединить и выбрать существующие потоки

Недостаточно умения создавать отдельные потоки. Иногда вы сталкиваетесь с несколькими потоками, но вам нужен только один. Некоторые функции также доступны как операторы, поэтому я не буду здесь углубляться. Я могу порекомендовать статью от Max NgWizard K, в которой даже есть несколько причудливых анимаций.

Еще одна рекомендация: вы можете интерактивно играть с операторами комбинации на RxMarbles, перетаскивая элементы.

Тип ObservableInput

Операторы и функции, которые ожидают поток (или массив потоков), обычно работают не только с наблюдаемыми объектами. Вместо этого они фактически ожидают, что аргумент будет иметь тип ObservableInput, который определяется следующим образом:

type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;

Это означает, что вы можете, например, передавать обещания или массивы без необходимости сначала преобразовывать их в наблюдаемые!

откладывать

Основная цель - отложить создание наблюдаемого до того момента, когда кто-то захочет подписаться. Это полезно, если

  • создание наблюдаемого требует больших вычислительных ресурсов
  • вы хотите новую наблюдаемую для каждого подписчика
  • вы хотите выбирать между различными наблюдаемыми во время подписки
  • какой-то код не должен выполняться перед подпиской

Последний пункт включает один не столь очевидный вариант использования: обещания (defer также может возвращать обещание). Возьмем этот пример с использованием API выборки:

function getUser(id) {
  console.log("fetching data");
  return fetch(`https://server/user/${id}`);
}
const userPromise = getUser(1);
console.log("I don't want that request now");
//somewhere else
userPromise.then(response => console.log("done");
// Prints
// fetching data
// I don't want that request now
// done

Обещания выполняются немедленно, тогда как потоки выполняются при подписке. В тот момент, когда мы вызываем getUser, отправляется запрос, даже если мы не хотели этого в тот момент. Конечно, мы можем использовать from для преобразования обещания в наблюдаемое, но переданное нами обещание уже создано / выполнено. defer позволяет нам дождаться подписки:

const user$ = defer(() => getUser(1));
console.log("I don't want that request now");
//somewhere else
user$.subscribe(response => console.log("done");
// Prints
// I don't want that request now
// fetching data
// done

iif

iif описывает специальный вариант использования defer: выбор между двумя потоками во время подписки:

iif(
  () => new Date().getHours() < 12,
  of("AM"),
  of("PM")
)
.subscribe();
// Produces
// AM before noon, PM afterwards

Процитируем документацию:

На самом деле iif легко реализуется с помощью defer и существует только для удобства и удобочитаемости.

onErrorResumeNext

Запускает первый поток и, если он терпит неудачу, переходит к следующему потоку. Ошибка игнорируется.

const stream1$ = of(1, 2).pipe(
  tap(i => { if(i>1) throw 'error'}) //fail after first element
);
const stream2$ = of(3,4);
onErrorResumeNext(stream1$, stream2$)
  .subscribe(console.log);
// Produces
// 1 3 4 |

Это может быть полезно, если у вас более одной веб-службы. В случае выхода из строя основного из них автоматически может быть вызвана служба резервного копирования.

forkJoin

Позволяет потокам выполняться одновременно и по завершении выводит их последние значения в массив. Поскольку генерируются только последние значения каждого потока, он обычно используется для потоков, которые генерируют только один элемент, например HTTP-запросы. Вы хотите, чтобы запросы выполнялись параллельно и выполняли какие-либо действия, когда у всех есть ответы.

function handleResponses([user, account]) {
  // do something
}
forkJoin(
  fetch("https://server/user/1"),
  fetch("https://server/account/1")
)
.subscribe(handleResponses);

слияние / объединение

Испускает каждое значение, которое испускает одна из исходных наблюдаемых.

merge поддерживает параметр, позволяющий определить, на сколько исходных потоков подписывается одновременно. По умолчанию не ограничено. Значение 1 будет означать прослушивание одного исходного потока и, когда он будет завершен, подписаться на следующий. Поскольку это очень распространенный сценарий, RxJS предоставляет явную функцию: concat.

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2)),
  timer(0, 1000).pipe(mapTo("Stream 3"), take(2)),
  2 //two concurrent streams
)
.subscribe();
// Subscribes to stream 1 and 2 only
// prints
// Stream 1 -> after 1000ms
// Stream 2 -> after 1200ms
// Stream 1 -> after 2000ms
// Stream 1 has completed, now subscribe to stream 3
// prints
// Stream 3 -> after 0 ms
// Stream 2 -> after 400 ms (2400ms from beginning)
// Stream 3 -> after 1000ms

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
  1
)
// is equal to
concat(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
)
// prints
// Stream 1 -> after 1000ms
// Stream 1 -> after 2000ms
// Stream 2 -> after 3200ms
// Stream 2 -> after 4400ms

застежка-молния / комбинироватьПоследнее

В то время как merge и concat генерируют все значения из исходных потоков по отдельности, zip и combLatest объединяют одно значение каждого исходного потока и генерируют их вместе. zip объединяет первые значения, передаваемые всеми (!) исходными потоками, вторые значения и так далее. Это полезно, если содержимое потоков связано.

zip(
  interval(1000),
  interval(1200),
)
.subscribe();
// Produces
// [0, 0] [1, 1] [2, 2] ...

commonLatest аналогичен, но объединяет последние значения, выданные исходными потоками. Ничего не происходит, пока все исходные потоки не передадут хотя бы одно значение. С этого момента каждый раз, когда исходный поток передает значение, оно объединяется с последними значениями других потоков.

combineLatest(
  interval(1000),
  interval(1200),
)
.subscribe();
// Produces
// [0, 0] [1, 0] [1, 1] [2, 1] ...

Обе функции позволяют передать функцию-селектор, которая может объединять элементы с чем-то еще, кроме массива:

zip(
  interval(1000),
  interval(1200),
  (e1, e2) -> e1 + e2
)
.subscribe();
// Produces
// 0 2 4 6 ...

гонка

Выбирается первый поток, излучающий значение. Таким образом, результирующий поток - это, по сути, самый быстрый поток.

race(
  interval(1000),
  of("foo")
)
.subscribe();
// Produces
// foo |

Поскольку of производит значение немедленно, выбирается более быстрый поток и поток.

Заключение

Было много способов создавать наблюдаемые. Их знание необходимо, если вы хотите создавать реактивные API или сочетать устаревшие API с реактивными.

Я представил вам все варианты, но обо всех можно сказать гораздо больше. Если вы хотите погрузиться глубже, настоятельно рекомендую ознакомиться с документацией или прочитать предлагаемые статьи.

Еще один интересный способ получить представление - RxViz. Вы пишете код RxJS, и результирующие потоки затем отображаются графически и анимируются.