Разница между потоками Java 8 и наблюдаемыми объектами RxJava

Сходны ли потоки Java 8 с наблюдаемыми объектами RxJava?

Определение потока Java 8:

Классы в новом пакете java.util.stream предоставляют Stream API для поддержки операций функционального стиля с потоками элементов.


person rahulrv    schedule 13.05.2015    source источник
comment
К вашему сведению, есть предложения ввести больше классов, подобных RxJava, в JDK 9. jsr166-concurrency.10961.n7.nabble.com/   -  person John Vint    schedule 13.05.2015
comment
@JohnVint Каков статус этого предложения. Он действительно полетит?   -  person IgorGanapolsky    schedule 08.03.2016
comment
@IgorGanapolsky О да, похоже, что он попадет в jdk9. cr.openjdk.java.net/~martin/webrevs / openjdk9 /. Существует даже порт для RxJava для Flow github.com/akarnokd/RxJavaUtilConcurrentFlow.   -  person John Vint    schedule 08.03.2016
comment
Я знаю, что это действительно старый вопрос, но я недавно присутствовал на этом замечательном выступлении Венката Субраманиама, в котором есть проницательный взгляд на эту тему и который обновлен до Java9: youtube.com/watch?v=kfSSKM9y_0E. Может быть интересно тем, кто разбирается в RxJava.   -  person Pedro    schedule 19.07.2018


Ответы (7)


Короткий ответ

Все библиотеки обработки последовательностей / потоков предлагают очень похожий API для построения конвейера. Отличия заключаются в API для обработки многопоточности и в составе конвейеров.

Длинный ответ

RxJava сильно отличается от Stream. Из всех вещей JDK наиболее близким к rx.Observable является, пожалуй, комбинация java.util.stream.Collector Stream + CompletableFuture (которая требует работы с дополнительным уровнем монад, т. Е. Необходимостью обрабатывать преобразование между Stream<CompletableFuture<T>> и CompletableFuture<Stream<T>>).

Между Observable и Stream есть существенные различия:

  • Потоки основаны на вытягивании, Observables - на выталкивании. Это может показаться слишком абстрактным, но имеет серьезные последствия, которые очень конкретны.
  • Stream можно использовать только один раз, на Observable можно подписаться много раз.
  • Stream#parallel() разбивает последовательность на разделы, Observable#subscribeOn() и Observable#observeOn() нет; сложно имитировать Stream#parallel() поведение с помощью Observable, у него когда-то был метод .parallel(), но этот метод вызвал такую ​​путаницу, что .parallel() поддержка была перемещена в отдельный репозиторий: ReactiveX / RxJavaParallel: экспериментальные параллельные расширения для RxJava. Дополнительные сведения см. В другом ответе.
  • Stream#parallel() не позволяет указать используемый пул потоков, в отличие от большинства методов RxJava, принимающих дополнительный планировщик. Поскольку все экземпляры потоков в JVM используют один и тот же пул fork-join, добавление .parallel() может случайно повлиять на поведение в другом модуле вашей программы.
  • В потоках отсутствуют связанные со временем операции, такие как Observable#interval(), Observable#window() и многие другие; это в основном потому, что потоки основаны на вытягивании, и вышестоящий не контролирует когда передавать следующий элемент в нисходящий поток.
  • Потоки предлагают ограниченный набор операций по сравнению с RxJava. Например. В потоках отсутствуют операции отключения (takeWhile(), takeUntil()); обходной путь с использованием Stream#anyMatch() ограничен: это терминальная операция, поэтому вы не можете использовать его более одного раза на поток
  • В JDK 8 нет операции Stream#zip(), которая иногда бывает весьма полезной.
  • Сложно создать потоки самостоятельно. Observable можно создать разными способами. РЕДАКТИРОВАТЬ: Как отмечалось в комментариях, существуют способы создания потока. Однако, поскольку нет короткого замыкания без клемм, вы не можете e. грамм. легко генерировать поток строк в файле (хотя JDK предоставляет Files#lines() и BufferedReader#lines() из коробки, и другими подобными сценариями можно управлять, создав Stream из Iterator).
  • Observable предлагает средство управления ресурсами (Observable#using()); вы можете обернуть им поток ввода-вывода или мьютекс и быть уверенным, что пользователь не забудет освободить ресурс - он будет автоматически удален при прекращении подписки; Stream имеет onClose(Runnable) метод, но вы должны вызывать его вручную или через try-with-resources. E. g. помните, что Files#lines() должен быть заключен в блок try-with-resources.
  • Observables полностью синхронизируются (на самом деле я не проверял, верно ли то же самое для Streams). Это избавляет вас от размышлений о том, являются ли базовые операции потокобезопасными (ответ всегда «да», если только нет ошибки), но накладные расходы, связанные с параллелизмом, будут присутствовать, независимо от того, нужно ли это вашему коду или нет.

Округлять

RxJava существенно отличается от Streams. Настоящие альтернативы RxJava - это другие реализации ReactiveStreams, e. грамм. соответствующая часть Akka.

Обновлять

Есть трюк с использованием нестандартного пула fork-join для Stream#parallel, см. Custom пул потоков в параллельном потоке Java 8.

Обновлять

Все вышеперечисленное основано на опыте работы с RxJava 1.x. Теперь, когда RxJava 2.x здесь, этот ответ может быть устаревшим.

person Kirill Gamazkov    schedule 02.03.2016
comment
Почему сложно построить потоки? Согласно этой статье, это кажется простым: oracle.com/technetwork/articles/java/ - person IgorGanapolsky; 08.03.2016
comment
Существует довольно много классов, у которых есть метод «потока»: коллекции, входные потоки, файлы каталогов и т. Д. Но что, если вы хотите создать поток из пользовательского цикла - скажем, перебирая курсор базы данных? Лучший способ, который я нашел до сих пор, - это создать Iterator, обернуть его Spliterator и, наконец, вызвать StreamSupport # fromSpliterator. Слишком много клея для простого дела ИМХО. Еще есть Stream.iterate, но он создает бесконечный поток. Единственный способ отключить поток в этом случае - Stream # anyMatch, но это терминальная операция, поэтому вы не можете разделить производителя потока и потребителя. - person Kirill Gamazkov; 08.03.2016
comment
RxJava имеет Observable.fromCallable, Observable.create и так далее. Или вы можете безопасно создать бесконечный Observable, а затем сказать '.takeWhile (condition)', и вы можете отправить эту последовательность потребителям. - person Kirill Gamazkov; 08.03.2016
comment
Самостоятельно построить потоки несложно. Вы можете просто вызвать Stream.generate() и передать свою собственную Supplier<U> реализацию, всего один простой метод, с помощью которого вы предоставите следующий элемент в потоке. Есть множество других методов. Чтобы легко построить последовательность Stream, которая зависит от предыдущих значений, вы можете использовать метод interate(), каждый Collection имеет метод stream(), а Stream.of() создает Stream из varargs или массива. Наконец, StreamSupport имеет поддержку более сложного создания потоков с использованием разделителей или для потоков примитивных типов. - person jbx; 01.11.2016
comment
В потоках отсутствуют операции отключения (takeWhile(), takeUntil()); - Я полагаю, что в JDK9 они есть в takeWhile () и dropWhile () - person Abdul; 30.11.2017
comment
Набор операций Stream не так богат, как у RxJava, Akka Streaams, ProjectReactor и т. Д. TakeWhile - это просто пример - person Kirill Gamazkov; 30.11.2017
comment
Что такое монада? - person Brian Joseph Spinos; 14.09.2019
comment
@BrianJosephSpinos: ну, в контексте моего ответа это была глупая попытка построить некую аналогию и выразить Rx через стандартные условия JDK. Если вам нужно определение монады, попробуйте stackoverflow.com/questions/2704652/ или en.wikipedia.org/wiki/Monad_ (функциональное_программирование). Сложность здесь в том, что монада настолько абстрактна, что ее трудно объяснить простыми (конкретными) словами. - person Kirill Gamazkov; 16.09.2019
comment
RxJava обрабатывает ошибки как первоклассный гражданин с onError, как и onNext для данных. В Java Streams дело обстоит иначе. - person Vipul Jain; 14.06.2020

Java 8 Stream и RxJava выглядят очень похоже. У них есть похожие операторы (фильтр, карта, flatMap ...), но они не созданы для одного и того же использования.

Вы можете выполнять асинхронные задачи, используя RxJava.

С помощью потока Java 8 вы будете перемещаться по элементам своей коллекции.

Вы можете делать почти то же самое в RxJava (перемещаться по элементам коллекции), но, поскольку RxJava ориентирован на параллельную задачу, ... он использует синхронизацию, защелку, ... Таким образом, та же задача с использованием RxJava может быть медленнее, чем с потоком Java 8.

RxJava можно сравнить с CompletableFuture, но он может вычислять более одного значения.

person dwursteisen    schedule 13.05.2015
comment
Стоит отметить, что ваше утверждение об обходе потока верно только для непараллельного потока. parallelStream поддерживает аналогичную синхронизацию простых переходов / карт / фильтрации и т. Д. - person John Vint; 13.05.2015
comment
Я не думаю, что та же задача с использованием RxJava может быть медленнее, чем с потоком Java 8. верно повсеместно, это сильно зависит от поставленной задачи. - person daschl; 09.02.2016
comment
Я рад, что вы сказали, что та же задача с использованием RxJava может выполняться медленнее, чем с потоком Java 8. Это очень важное различие, о котором многие пользователи RxJava не знают. - person IgorGanapolsky; 08.03.2016
comment
RxJava по умолчанию синхронный. Есть ли у вас какие-либо тесты, подтверждающие ваше утверждение, что он может быть медленнее? - person Marcin Koziński; 20.05.2016
comment
@ marcin-koziński, вы можете проверить этот тест: twitter.com/akarnokd/status/752465265091309568 - person dwursteisen; 23.07.2016
comment
Было бы неплохо иметь список вариантов использования, рекомендующий использование потоков J8 или Rx для каждого из них. - person Stephane; 02.12.2019

Есть несколько технических и концептуальных различий, например, потоки Java 8 представляют собой одноразовые, основанные на извлечении, синхронные последовательности значений, тогда как наблюдаемые объекты RxJava являются повторно наблюдаемыми, адаптивно основанными на push-pull, потенциально асинхронными последовательностями значений. RxJava нацелен на Java 6+ и также работает на Android.

person akarnokd    schedule 13.05.2015
comment
Типичный код, включающий RxJava, интенсивно использует лямбда-выражения, которые доступны только с Java 8. Таким образом, вы можете использовать Rx с Java 6, но код будет шумным - person Kirill Gamazkov; 03.03.2016
comment
Аналогичное различие заключается в том, что Rx Observables могут оставаться в живых неопределенно долго, пока не отменится подписка. По умолчанию потоки Java 8 завершаются операциями. - person IgorGanapolsky; 08.03.2016
comment
@KirillGamazkov, вы можете использовать retrolambda, чтобы сделать ваш код красивее при ориентации на Java 6. - person Marcin Koziński; 20.05.2016
comment
Котлин выглядит даже сексуальнее, чем дооснащение - person Kirill Gamazkov; 30.11.2017

Потоки Java 8 основаны на вытягивании. Вы перебираете поток Java 8, потребляя каждый элемент. И это мог быть бесконечный поток.

RXJava Observable по умолчанию основан на push. Вы подписываетесь на Observable, и вы получите уведомление, когда прибудет следующий элемент (onNext), или когда поток будет завершен (onCompleted), или когда произошла ошибка (onError). Поскольку с Observable вы получаете события onNext, onCompleted, onError, вы можете выполнять некоторые мощные функции, такие как объединение разных Observable в новый (zip, merge, concat). Другие вещи, которые вы могли бы сделать, - это кеширование, регулирование ... И он использует более или менее один и тот же API на разных языках (RxJava, RX на C #, RxJS, ...)

По умолчанию RxJava однопоточный. Если вы не начнете использовать Планировщики, все будет происходить в одном потоке.

person Bart De Neuter    schedule 15.09.2015
comment
в Stream у вас есть forEach, это почти то же самое, что и onNext - person paul; 12.12.2015
comment
На самом деле потоки обычно являются терминальными. Операции, закрывающие конвейер потока, называются терминальными операциями. Они производят результат из конвейера, такого как List, Integer или даже void (любой тип, отличный от Stream). ~ oracle.com/technetwork/articles / java / - person IgorGanapolsky; 08.03.2016

Существующие ответы исчерпывающие и правильные, но наглядного примера для начинающих не хватает. Позвольте мне добавить некоторые конкретные термины, такие как «выталкивающий / выталкивающий» и «повторно наблюдаемый». Примечание: я ненавижу термин Observable (ради всего святого, это поток), поэтому буду просто относиться к потокам J8 и RX.

Рассмотрим список целых чисел,

digits = [1,2,3,4,5]

J8 Stream - это утилита для изменения коллекции. Например, четные цифры могут быть извлечены как,

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

По сути, это map, filter, reduce, очень хороший (и давно назревший ) дополнение к Java. Но что, если бы цифры не собирались заранее - что, если бы цифры передавались во время работы приложения - можно ли фильтровать четности в реальном времени.

Представьте, что отдельный процесс потока выводит целые числа в случайное время во время работы приложения (--- обозначает время)

digits = 12345---6------7--8--9-10--------11--12

В RX even может реагировать на каждую новую цифру и применять фильтр в реальном времени.

even = -2-4-----6---------8----10------------12

Нет необходимости хранить списки ввода и вывода. Если вы хотите получить список вывода, нет проблем с возможностью потоковой передачи. Фактически, все является потоком.

evens_stored = even.collect()  

Вот почему такие термины, как «без состояния» и «функциональный», больше связаны с RX.

person Adam Hughes    schedule 05.05.2017
comment
Но 5 - это даже не… И это похоже на то, что J8 Stream синхронный, а Rx Stream - асинхронный? - person Franklin Yu; 05.05.2018
comment
@FranklinYu, спасибо, я исправил 5 опечаток. Если мыслить меньше с точки зрения синхронного и асинхронного, хотя это может быть правильно, и больше с точки зрения императивного и функционального. В J8 вы сначала собираете все свои предметы, а затем применяете фильтр. В RX вы определяете функцию фильтра независимо от данных, а затем связываете ее с четным источником (живым потоком или коллекцией java) ... это совершенно другая модель программирования - person Adam Hughes; 11.01.2019
comment
Меня это очень удивляет. Я почти уверен, что потоки Java можно сделать из потоковой передачи данных. Почему вы думаете об обратном? - person Vic Seedoubleyew; 22.05.2019

RxJava также тесно связан с инициативой реактивных потоков и считает себя простая реализация API реактивных потоков (например, по сравнению с Реализация потоков Akka). Основное отличие состоит в том, что реактивные потоки предназначены для того, чтобы выдерживать обратное давление, но если вы посмотрите на страницу реактивных потоков, вы поймете идею. Они довольно хорошо описывают свои цели, и потоки также тесно связаны с реактивным манифестом.

Потоки Java 8 в значительной степени являются реализацией неограниченной коллекции, очень похожей на Scala Stream или Clojure lazy seq.

person Niclas Meier    schedule 16.09.2015

Потоки Java 8 позволяют эффективно обрабатывать действительно большие коллекции, используя многоядерные архитектуры. Напротив, RxJava по умолчанию является однопоточным (без планировщиков). Таким образом, RxJava не будет использовать преимущества многоядерных машин, если вы сами не запрограммируете эту логику.

person IgorGanapolsky    schedule 08.03.2016
comment
По умолчанию поток также является однопоточным, если вы не вызываете .parallel (). Кроме того, Rx дает больший контроль над параллелизмом. - person Kirill Gamazkov; 30.01.2017
comment
@KirillGamazkov Kotlin Coroutines Flow (на основе Java8 Streams) теперь поддерживает структурированный параллелизм: kotlinlang. org / docs / reference / coroutines / flow.html #flow - person IgorGanapolsky; 02.12.2019
comment
Верно, но я ничего не сказал о Flow и структурированном параллелизме. Мои две точки: 1) и Stream, и Rx являются однопоточными, если вы явно не измените это; 2) Rx дает вам детальный контроль над тем, какой шаг выполнять в каком пуле потоков, в отличие от Streams, позволяя вам только сказать, сделать его как-то параллельным - person Kirill Gamazkov; 16.12.2019
comment
Я действительно не понимаю, для чего вам нужен пул потоков. Как вы сказали, для эффективной обработки действительно больших коллекций. Или, может быть, я хочу, чтобы часть задачи, связанная с вводом-выводом, выполнялась в отдельном пуле потоков. Не думаю, что я понял смысл вашего вопроса. Попробуйте снова? - person Kirill Gamazkov; 31.12.2019
comment
Статические методы в классе Schedulers позволяют получать предопределенные пулы потоков, а также создавать пулы из Executor. См. reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/ - person Kirill Gamazkov; 10.01.2020
comment
@KirillGamazkov Дело в том, чтобы использовать многоядерность. Не многопоточные пулы. - person IgorGanapolsky; 15.10.2020