Изменяемые параметры в потоках Java 8

Глядя на этот вопрос: Как динамически выполнять фильтрацию в Java 8?

Проблема заключается в том, чтобы обрезать поток после выполнения фильтра. Я не могу использовать ограничение потому что я не знаю, какова длина списка после фильтра. Итак, можем ли мы посчитать элементы после фильтра?

Итак, я подумал, что могу создать класс, который подсчитывает и пропускает поток через карту. 8/22856514#22856514">Код находится в этом ответе.

Я создал класс, который учитывается, но оставляет элементы без изменений. Здесь я использую функцию, чтобы не использовать лямбда-выражения, которые я использовал в другом ответе:

class DoNothingButCount<T > implements Function<T, T> {
    AtomicInteger i;
    public DoNothingButCount() {
        i = new AtomicInteger(0);
    }
    public T apply(T p) {
        i.incrementAndGet();
        return p;
    }
}

Итак, мой поток был наконец:

persons.stream()
    .filter(u -> u.size > 12)
    .filter(u -> u.weitght > 12)
    .map(counter)
    .sorted((p1, p2) -> p1.age - p2.age)
    .collect(Collectors.toList())
    .stream()
    .limit((int) (counter.i.intValue() * 0.5))
    .sorted((p1, p2) -> p2.length - p1.length)
    .limit((int) (counter.i.intValue() * 0.5 * 0.2)).forEach((p) -> System.out.println(p));

Но мой вопрос касается другой части моего примера.

collect(Collectors.toList()).stream().

Если я удалю эту строку, последствиями будет то, что счетчик будет равен НУЛЮ, когда я попытаюсь выполнить limit. Я каким-то образом обманываю требование «эффективно окончательного», используя изменяемый объект.

Я могу ошибаться, но я понимаю, что поток создается первым, поэтому, если мы использовали изменяемые объекты для передачи параметров любому из шагов в потоке, они будут приняты при создании потока.

У меня вопрос, если мое предположение верно, то зачем это нужно? Поток (если он непараллельный) может проходить последовательно через все этапы (фильтр, сопоставление и т. д.), поэтому это ограничение не требуется.


person Raul Guiu    schedule 07.04.2014    source источник


Ответы (1)


Короткий ответ

У меня вопрос, если мое предположение верно, то зачем это нужно? Поток (если он непараллельный) может проходить последовательно через все этапы (фильтр, сопоставление и т. д.), поэтому это ограничение не требуется.

Как вы уже знаете, для параллельных потоков это звучит довольно очевидно: это ограничение необходимо, потому что в противном случае результат будет недетерминированным.

Что касается непараллельных потоков, это невозможно из-за их текущего дизайна: каждый элемент посещается только один раз. Если бы потоки работали так, как вы предлагаете, они выполняли бы каждый шаг для всей коллекции, прежде чем переходить к следующему шагу, что, я думаю, вероятно, оказало бы влияние на производительность. Я подозреваю, что именно поэтому разработчики языка приняли такое решение.


Почему это технически не работает без collect

Вы это уже знаете, но вот объяснение для других читателей. Из документов:

Потоки ленивы; вычисление исходных данных выполняется только при инициации операции терминала, а исходные элементы используются только по мере необходимости.

Каждая промежуточная операция Stream, такая как filter() или limit(), на самом деле является своего рода установщиком, который инициализирует параметры потока.

Когда вы вызываете операцию терминала, такую ​​как forEach(), collect() или count(), происходит вычисление, обрабатывая элементы в соответствии с ранее созданным конвейером.

Вот почему аргумент limit() оценивается до того, как один элемент пройдет первый шаг потока. Вот почему вам нужно завершить поток терминальной операцией и начать новый с limit(), который вы тогда узнаете.

Более подробный ответ о том, почему бы не разрешить параллельные потоки

Пусть ваш потоковый конвейер будет step X > step Y > step Z.

Мы хотим параллельной обработки наших предметов. Следовательно, если мы позволим поведению шага Y зависеть от элементов, которые уже прошли через X, то Y недетерминирован. Это связано с тем, что в момент поступления элемента на шаг Y набор элементов, уже прошедших через X, не будет одинаковым при нескольких выполнениях (из-за многопоточности).

Более подробный ответ о том, почему бы не разрешить это для непараллельных потоков

Поток по определению используется для обработки элементов потока. Вы можете думать о непараллельном потоке следующим образом: один элемент проходит через все шаги, затем следующий проходит через все шаги и т. д. На самом деле в документе все сказано:

Элементы потока посещаются только один раз в течение жизни потока. Как и в случае с итератором, необходимо создать новый поток для повторного посещения тех же элементов исходного кода.

Если бы потоки не работали так, было бы не лучше, чем просто выполнять каждый шаг для всей коллекции, прежде чем переходить к следующему шагу. Это фактически разрешит изменяемые параметры в непараллельных потоках, но это, вероятно, повлияет на производительность (поскольку мы будем перебирать коллекцию несколько раз). В любом случае, их текущее поведение не позволяет вам делать то, что вы хотите.

person Community    schedule 07.04.2014
comment
Вы сказали: если одному шагу в процессе нужно знать о результате обработки всех элементов на предыдущих шагах, тогда идея всего потока больше не работает. Почему? В связанном вопросе пользователь хочет получить верхние 50% после применения фильтра (поэтому мы не можем использовать ограничение, потому что мы не знаем, сколько их будет). Как вы думаете, почему это противоречит идее потоков? - person Raul Guiu; 07.04.2014
comment
Потому что, ИМХО, использование потока означает, что вы хотите обрабатывать каждый элемент одинаково (например, на автомобильном заводе: добавить материал в текущую машину, а затем передать его на следующий шаг). Здесь, если вам нужно знать результат в зависимости от всех элементов, то обработка не одинакова для каждого элемента. Вот почему необходима терминальная операция. - person Joffrey; 07.04.2014
comment
Звучит неплохо. Но у нас еще есть фильтр и лимит, после фильтра и лимита некоторые элементы не будут обрабатываться - person Raul Guiu; 07.04.2014
comment
Что ж, filter одинаково фильтрует все элементы, ему не нужно заранее знать обо всех элементах, достаточно знать, как выглядит текущий. - person Joffrey; 07.04.2014
comment
Что касается limit, то оно еще не зависит от будущего, нужно помнить только прошлые элементы. - person Joffrey; 07.04.2014
comment
Взгляните на отредактированную версию моего сообщения, может быть, вы будете более довольны;) - person Joffrey; 07.04.2014
comment
Да, теперь я понимаю, что вы имеете в виду. +1. Но я оставлю это непринятым на некоторое время, чтобы дать кому-то время привести более глубокие аргументы. - person Raul Guiu; 07.04.2014
comment
Аргумент «прошлое против будущего» вас недостаточно убеждает? :) - person Joffrey; 07.04.2014