Сходны ли потоки Java 8 с наблюдаемыми объектами RxJava?
Определение потока Java 8:
Классы в новом пакете
java.util.stream
предоставляют Stream API для поддержки операций функционального стиля с потоками элементов.
Сходны ли потоки Java 8 с наблюдаемыми объектами RxJava?
Определение потока Java 8:
Классы в новом пакете
java.util.stream
предоставляют Stream API для поддержки операций функционального стиля с потоками элементов.
Все библиотеки обработки последовательностей / потоков предлагают очень похожий API для построения конвейера. Отличия заключаются в API для обработки многопоточности и в составе конвейеров.
RxJava сильно отличается от Stream. Из всех вещей JDK наиболее близким к rx.Observable
является, пожалуй, комбинация java.util.stream.Collector
Stream
+ CompletableFuture
(которая требует работы с дополнительным уровнем монад, т. Е. Необходимостью обрабатывать преобразование между Stream<CompletableFuture<T>>
и CompletableFuture<Stream<T>>
).
Между Observable и Stream есть существенные различия:
Stream#parallel()
разбивает последовательность на разделы, Observable#subscribeOn()
и Observable#observeOn()
нет; сложно имитировать Stream#parallel()
поведение с помощью Observable, у него когда-то был метод .parallel()
, но этот метод вызвал такую путаницу, что .parallel()
поддержка была перемещена в отдельный репозиторий: ReactiveX / RxJavaParallel: экспериментальные параллельные расширения для RxJava. Дополнительные сведения см. В другом ответе.Stream#parallel()
не позволяет указать используемый пул потоков, в отличие от большинства методов RxJava, принимающих дополнительный планировщик. Поскольку все экземпляры потоков в JVM используют один и тот же пул fork-join, добавление .parallel()
может случайно повлиять на поведение в другом модуле вашей программы.Observable#interval()
, Observable#window()
и многие другие; это в основном потому, что потоки основаны на вытягивании, и вышестоящий не контролирует когда передавать следующий элемент в нисходящий поток.takeWhile()
, takeUntil()
); обходной путь с использованием Stream#anyMatch()
ограничен: это терминальная операция, поэтому вы не можете использовать его более одного раза на потокStream#zip()
, которая иногда бывает весьма полезной.Files#lines()
и BufferedReader#lines()
из коробки, и другими подобными сценариями можно управлять, создав Stream из Iterator).Observable#using()
); вы можете обернуть им поток ввода-вывода или мьютекс и быть уверенным, что пользователь не забудет освободить ресурс - он будет автоматически удален при прекращении подписки; Stream имеет onClose(Runnable)
метод, но вы должны вызывать его вручную или через try-with-resources. E. g. помните, что Files#lines()
должен быть заключен в блок try-with-resources.RxJava существенно отличается от Streams. Настоящие альтернативы RxJava - это другие реализации ReactiveStreams, e. грамм. соответствующая часть Akka.
Есть трюк с использованием нестандартного пула fork-join для Stream#parallel
, см. Custom пул потоков в параллельном потоке Java 8.
Все вышеперечисленное основано на опыте работы с RxJava 1.x. Теперь, когда RxJava 2.x здесь, этот ответ может быть устаревшим.
Stream.generate()
и передать свою собственную Supplier<U>
реализацию, всего один простой метод, с помощью которого вы предоставите следующий элемент в потоке. Есть множество других методов. Чтобы легко построить последовательность Stream
, которая зависит от предыдущих значений, вы можете использовать метод interate()
, каждый Collection
имеет метод stream()
, а Stream.of()
создает Stream
из varargs или массива. Наконец, StreamSupport
имеет поддержку более сложного создания потоков с использованием разделителей или для потоков примитивных типов.
- person jbx; 01.11.2016
takeWhile()
, takeUntil()
); - Я полагаю, что в JDK9 они есть в takeWhile () и dropWhile ()
- person Abdul; 30.11.2017
Java 8 Stream и RxJava выглядят очень похоже. У них есть похожие операторы (фильтр, карта, flatMap ...), но они не созданы для одного и того же использования.
Вы можете выполнять асинхронные задачи, используя RxJava.
С помощью потока Java 8 вы будете перемещаться по элементам своей коллекции.
Вы можете делать почти то же самое в RxJava (перемещаться по элементам коллекции), но, поскольку RxJava ориентирован на параллельную задачу, ... он использует синхронизацию, защелку, ... Таким образом, та же задача с использованием RxJava может быть медленнее, чем с потоком Java 8.
RxJava можно сравнить с CompletableFuture
, но он может вычислять более одного значения.
parallelStream
поддерживает аналогичную синхронизацию простых переходов / карт / фильтрации и т. Д.
- person John Vint; 13.05.2015
Есть несколько технических и концептуальных различий, например, потоки Java 8 представляют собой одноразовые, основанные на извлечении, синхронные последовательности значений, тогда как наблюдаемые объекты RxJava являются повторно наблюдаемыми, адаптивно основанными на push-pull, потенциально асинхронными последовательностями значений. RxJava нацелен на Java 6+ и также работает на Android.
Потоки Java 8 основаны на вытягивании. Вы перебираете поток Java 8, потребляя каждый элемент. И это мог быть бесконечный поток.
RXJava Observable
по умолчанию основан на push. Вы подписываетесь на Observable, и вы получите уведомление, когда прибудет следующий элемент (onNext
), или когда поток будет завершен (onCompleted
), или когда произошла ошибка (onError
). Поскольку с Observable
вы получаете события onNext
, onCompleted
, onError
, вы можете выполнять некоторые мощные функции, такие как объединение разных Observable
в новый (zip
, merge
, concat
). Другие вещи, которые вы могли бы сделать, - это кеширование, регулирование ... И он использует более или менее один и тот же API на разных языках (RxJava, RX на C #, RxJS, ...)
По умолчанию RxJava однопоточный. Если вы не начнете использовать Планировщики, все будет происходить в одном потоке.
Существующие ответы исчерпывающие и правильные, но наглядного примера для начинающих не хватает. Позвольте мне добавить некоторые конкретные термины, такие как «выталкивающий / выталкивающий» и «повторно наблюдаемый». Примечание: я ненавижу термин Observable
(ради всего святого, это поток), поэтому буду просто относиться к потокам J8 и RX.
Рассмотрим список целых чисел,
digits = [1,2,3,4,5]
J8 Stream - это утилита для изменения коллекции. Например, четные цифры могут быть извлечены как,
evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())
По сути, это map, filter, reduce, очень хороший (и давно назревший ) дополнение к Java. Но что, если бы цифры не собирались заранее - что, если бы цифры передавались во время работы приложения - можно ли фильтровать четности в реальном времени.
Представьте, что отдельный процесс потока выводит целые числа в случайное время во время работы приложения (---
обозначает время)
digits = 12345---6------7--8--9-10--------11--12
В RX even
может реагировать на каждую новую цифру и применять фильтр в реальном времени.
even = -2-4-----6---------8----10------------12
Нет необходимости хранить списки ввода и вывода. Если вы хотите получить список вывода, нет проблем с возможностью потоковой передачи. Фактически, все является потоком.
evens_stored = even.collect()
Вот почему такие термины, как «без состояния» и «функциональный», больше связаны с RX.
RxJava также тесно связан с инициативой реактивных потоков и считает себя простая реализация API реактивных потоков (например, по сравнению с Реализация потоков Akka). Основное отличие состоит в том, что реактивные потоки предназначены для того, чтобы выдерживать обратное давление, но если вы посмотрите на страницу реактивных потоков, вы поймете идею. Они довольно хорошо описывают свои цели, и потоки также тесно связаны с реактивным манифестом.
Потоки Java 8 в значительной степени являются реализацией неограниченной коллекции, очень похожей на Scala Stream или Clojure lazy seq.
Потоки Java 8 позволяют эффективно обрабатывать действительно большие коллекции, используя многоядерные архитектуры. Напротив, RxJava по умолчанию является однопоточным (без планировщиков). Таким образом, RxJava не будет использовать преимущества многоядерных машин, если вы сами не запрограммируете эту логику.