Потребление значений, одновременно испускаемых наблюдателем

Я изучаю реактивное программирование с помощью RxJava и хочу одновременно использовать переданные значения без блокировки в одном потоке выполнения.

        Observable
            .interval(50, TimeUnit.MILLISECONDS)
            .take(5)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long counter) {
                    sleep(1000);
                    System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
                }
            });

    sleep(10000);

Я получу этот вывод

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1

как мне обрабатывать каждое событие в асинхронном режиме? как это

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5

person vach    schedule 23.01.2015    source источник


Ответы (1)


В Rx наблюдаемый объект представляет параллелизм1, поэтому для одновременной обработки уведомлений по отношению друг к другу вы должны спроецировать каждое уведомление в наблюдаемый объект.

flatMap — это оператор асинхронной последовательной композиции. Он проецирует каждое уведомление из наблюдаемого источника в наблюдаемое, что позволяет вам одновременно обрабатывать каждое входное значение. Затем он объединяет результаты каждого вычисления в сглаженную наблюдаемую последовательность с неперекрывающимися уведомлениями.

Дополнение:

В selector для flatMap часто существует несколько способов создания параллельного наблюдаемого объекта в зависимости от целевой платформы. Я не знаком с Java, но в .NET вы обычно либо используете Observable.Start для реализации параллелизма, либо асинхронный метод (async/await) для использования преимущества встроенной асинхронности, что часто предпочтительнее.

1 Технически, индивидуальная подписка (наблюдатель) для холодного наблюдаемого объекта обеспечивает параллелизм в Rx, хотя вместо этого часто удобнее думать в терминах наблюдаемых объектов. См. этот ответ для получения дополнительной информации.

person Dave Sexton    schedule 23.01.2015
comment
Не могли бы вы предложить изменение, которое мне нужно сделать, чтобы получить желаемое поведение? Я думаю, что это будет очень легко понять на реальном примере... - person vach; 23.01.2015
comment
Извините, я не знаю Java, но в .NET это выглядело бы примерно так: xs.FlatMap(x => ProcessAsync(x)).Subscribe(), где ProcessAsync — это метод, который принимает x в качестве аргумента и возвращает наблюдаемое значение любого типа. Вы также можете вызвать Subscribe без каких-либо параметров только из-за его побочных эффектов, предполагая, что вам не нужно обрабатывать возвращаемые значения каждого вызова ProcessAsync. - person Dave Sexton; 23.01.2015
comment
Я обновил свой ответ, чтобы включить дополнительную информацию. - person Dave Sexton; 23.01.2015