Ограничить поток предикатом

Существует ли потоковая операция Java 8, которая ограничивает (потенциально бесконечное) Stream до тех пор, пока первый элемент не перестанет соответствовать предикату?

В Java 9 мы можем использовать takeWhile, как в примере ниже, чтобы вывести все числа меньше 10.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

Поскольку в Java 8 такой операции нет, как лучше всего реализовать ее в целом?


person MForster    schedule 23.12.2013    source источник
comment
Возможная полезная информация по адресу: stackoverflow.com/q/19803058/248082   -  person nobeh    schedule 24.12.2013
comment
Связано: Эквивалент Scala dropWhile   -  person charlie    schedule 28.06.2016
comment
Мне интересно, как архитекторы могли когда-либо разобраться в том, для чего мы действительно можем использовать это, не сталкиваясь с этим вариантом использования. Начиная с Java 8 Streams действительно полезны только для существующих структур данных: - /   -  person Thorbjørn Ravn Andersen    schedule 04.10.2016
comment
См. Также Как отключить операцию reduce (). в потоке?   -  person Vadzim    schedule 16.07.2018
comment
С Java 9 было бы проще написать IntStream.iterate(1, n->n<10, n->n+1).forEach(System.out::print);   -  person Marc Dzaebel    schedule 08.02.2019


Ответы (19)


Операции takeWhile и dropWhile были добавлены в JDK 9. Ваш пример кода

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

будет вести себя именно так, как вы ожидаете, при компиляции и запуске под JDK 9.

Выпущен JDK 9. Его можно скачать здесь: JDK 9 Releases.

person Stuart Marks    schedule 31.08.2015
comment
Прямая ссылка на предварительную версию документации для JDK9 Stream с _1 _ / _ 2_: download.java.net/jdk9/docs/api/java/util/stream/Stream.html - person Miles; 16.12.2015
comment
Есть ли причина, по которой они называются takeWhile и dropWhile, а не limitWhile и skipWhile, для согласованности с существующим API? - person Lukas Eder; 07.01.2016
comment
@LukasEder takeWhile и dropWhile довольно широко распространены, встречаются в Scala, Python, Groovy, Ruby, Haskell и Clojure. Асимметрия с skip и limit прискорбна. Возможно, skip и limit следовало бы называть drop и take, но это не так интуитивно понятно, если вы уже не знакомы с Haskell. - person Stuart Marks; 08.01.2016
comment
@StuartMarks: Я понимаю, что dropXXX и takeXXX - более популярные термины, но я лично могу жить с более SQL-esque limitXXX и skipXXX. Я нахожу эту новую асимметрию гораздо более запутанной, чем индивидуальный выбор терминов ... :) (кстати: в Scala также есть drop(int) и take(int)) - person Lukas Eder; 08.01.2016
comment
да, позвольте мне просто перейти на Jdk 9 в производстве. Многие разработчики все еще используют Jdk8, такая функция должна была быть включена в Streams с самого начала. - person wilmol; 01.07.2019
comment
IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) можно упростить до IntStream .iterate(1, n -> n < 10, n -> n + 1) - person Holger; 28.11.2020
comment
@Holger это предпочтение, но я предпочитаю немного более подробный, так как легче понять, что происходит. - person Archimedes Trajano; 22.03.2021
comment
@ArchimedesTrajano в этом случае понятно, поскольку iterate(1, n -> n < 10, n -> n + 1) напрямую соответствует for(int n = 1; n < 10; n = n + 1) - person Holger; 23.03.2021

Такая операция должна быть возможна с Java 8 Stream, но не обязательно может быть выполнена эффективно - например, вы не можете обязательно распараллелить такую ​​операцию, так как вам нужно посмотреть на элементы по порядку.

API не предоставляет простого способа сделать это, но, вероятно, самый простой способ - взять Stream.iterator(), обернуть _ 3_, чтобы получить временную реализацию, а затем вернитесь к Spliterator, а затем к Stream. Или - возможно - оберните _ 6_, хотя в этой реализации его больше нельзя разделить.

Вот непроверенная реализация takeWhile на Spliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}
person Louis Wasserman    schedule 24.12.2013
comment
Теоретически распараллелить takeWhile с предикатом без состояния просто. Оцените условие в параллельных пакетах (при условии, что предикат не вызывает и не имеет побочных эффектов, если выполняется несколько дополнительных раз). Проблема заключается в том, что это делается в контексте рекурсивной декомпозиции (структура fork / join), которую используют Streams. На самом деле, именно Streams ужасно неэффективны. - person Aleksandr Dubinsky; 26.12.2013
comment
Потоки были бы намного лучше, если бы они не были так озабочены автоматическим параллелизмом. Параллелизм нужен только в небольшой части мест, где можно использовать Streams. Кроме того, если бы Oracle так заботилась о перфорации, они могли бы заставить JVM JIT автовекторизоваться и получить гораздо больший прирост производительности, не беспокоя разработчиков. Теперь это правильно выполненный автоматический магический параллелизм. - person Aleksandr Dubinsky; 26.12.2013
comment
Вам следует обновить этот ответ сейчас, когда выпущена Java 9. - person Radiodef; 05.08.2018
comment
Нет, @Radiodef. Вопрос касается конкретно решения Java 8. - person Renato Back; 16.08.2018

allMatch() - это функция короткого замыкания, поэтому вы можете использовать ее для остановки обработки. Основным недостатком является то, что вам придется выполнять тест дважды: один раз, чтобы увидеть, нужно ли его обрабатывать, и еще раз, чтобы увидеть, продолжать ли.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);
person Michael Rowley    schedule 13.11.2014
comment
Сначала это показалось мне неинтуитивным (учитывая название метода), но документы подтверждают, что Stream.allMatch() является операция короткого замыкания. Таким образом, это будет выполнено даже в бесконечном потоке, таком как IntStream.iterate(). Конечно, оглядываясь назад, можно сказать, что это разумная оптимизация. - person Bailey Parker; 26.11.2015
comment
Это красиво, но я не думаю, что он очень хорошо передает, что его цель - тело peek. Если бы я столкнулся с этим в следующем месяце, я бы потратил минуту, чтобы задаться вопросом, почему программист передо мной проверил, allMatch, а затем проигнорировал ответ. - person Joshua Goldberg; 22.03.2017
comment
Недостатком этого решения является то, что оно возвращает логическое значение, поэтому вы не можете собирать результаты потока, как обычно. - person neXus; 06.10.2017

В продолжение ответа @StuartMarks. В моей библиотеке StreamEx есть _ 1_, совместимая с текущей реализацией JDK-9. При работе под JDK-9 он просто делегирует выполнение JDK (через MethodHandle.invokeExact, что очень быстро). При работе под JDK-8 будет использоваться реализация «polyfill». Итак, используя мою библиотеку, проблему можно решить так:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);
person Tagir Valeev    schedule 31.08.2015
comment
Почему вы не реализовали это для класса StreamEx? - person th0masb; 09.04.2018
comment
@Someguy Я это реализовал. - person Tagir Valeev; 12.04.2018

takeWhile - одна из функций, предоставляемых библиотекой protonpack.

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));
person Dominic Fox    schedule 04.09.2014

Обновление: Java 9 Stream теперь поставляется с takeWhile.

Нет необходимости во взломах или других решениях. Просто используйте это!


Я уверен, что это можно значительно улучшить: (возможно, кто-то может сделать его поточно-ориентированным)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

Хак точно ... Не элегантно - но работает ~: D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}
person The Coordinator    schedule 25.12.2013

Вы можете использовать java8 + rxjava.

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));
person frhack    schedule 02.06.2015

На самом деле есть 2 способа сделать это в Java 8 без дополнительных библиотек или с использованием Java 9.

Если вы хотите напечатать на консоли числа от 2 до 20, вы можете сделать это:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

or

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

Результат в обоих случаях:

2
4
6
8
10
12
14
16
18
20

Никто еще не упомянул anyMatch. Это причина для этого сообщения.

person gil.fernandes    schedule 04.05.2017

Это источник, скопированный из JDK 9 java.util.stream.Stream.takeWhile (Predicate). Небольшая разница для работы с JDK 8.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}
person martian    schedule 27.09.2017

Вот версия, сделанная на ints - как задано в вопросе.

Использование:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Вот код для StreamUtil:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil
{
    public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
    {
        return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
    }

    private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
    {
        private final PrimitiveIterator.OfInt iterator;
        private final IntPredicate predicate;

        public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
        {
            super(Long.MAX_VALUE, IMMUTABLE);
            this.iterator = stream.iterator();
            this.predicate = predicate;
        }

        @Override
        public boolean tryAdvance(IntConsumer action)
        {
            if (iterator.hasNext()) {
                int value = iterator.nextInt();
                if (predicate.test(value)) {
                    action.accept(value);
                    return true;
                }
            }

            return false;
        }
    }
}
person Chris Greenaway    schedule 02.07.2014

Перейдите к библиотеке AbacusUtil. Он предоставляет именно тот API, который вам нужен, и многое другое:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Декларация : Я разработчик AbacusUtil.

person user_3380739    schedule 30.11.2016

    IntStream.iterate(1, n -> n + 1)
    .peek(System.out::println) //it will be executed 9 times
    .filter(n->n>=9)
    .findAny();

вместо пика вы можете использовать mapToObj для возврата конечного объекта или сообщения

    IntStream.iterate(1, n -> n + 1)
    .mapToObj(n->{   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";});
    .filter(message->!message.isEmpty())
    .findAny()
    .ifPresent(System.out::println);
person Oleksandr Potomkin    schedule 23.04.2019
comment
Это должен быть принятый ответ, если он работает стабильно. - person Whimusical; 15.01.2021

Вы не можете прервать поток, кроме как путем короткого замыкания терминальной операции, в результате чего некоторые значения потока останутся необработанными независимо от их значения. Но если вы просто хотите избежать операций с потоком, вы можете добавить в поток преобразование и фильтр:

import java.util.Objects;

class ThingProcessor
{
    static Thing returnNullOnCondition(Thing thing)
    {    return( (*** is condition met ***)? null : thing);    }

    void processThings(Collection<Thing> thingsCollection)
    {
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    }
} // class ThingProcessor

Это преобразует поток вещей в нулевые значения, когда объекты соответствуют некоторому условию, а затем отфильтровывает нули. Если вы хотите проявить побочные эффекты, вы можете установить значение условия в true, как только что-то встречается, чтобы все последующие вещи отфильтровывались независимо от их значения. Но даже если нет, вы можете сэкономить большую часть (если не всю) обработку, отфильтровав значения из потока, которые вы не хотите обрабатывать.

person Matthew    schedule 09.03.2017
comment
Плохо, что какой-то анонимный оценщик занизил мой ответ, не объяснив почему. Так что ни я, ни другие читатели не знают, что не так с моим ответом. В отсутствие их обоснования я считаю их критику недействительной, а мой ответ как опубликованный - правильным. - person Matthew; 25.03.2017
comment
Ваш ответ не решает проблему OP, которая имеет дело с бесконечными потоками. Это также, кажется, излишне усложняет ситуацию, поскольку вы можете записать условие в самом вызове filter (), не используя map (). В вопросе уже есть пример кода, просто попробуйте применить свой ответ к этому коду, и вы увидите, что программа будет повторяться бесконечно. - person SenoCtar; 04.04.2017

Даже у меня было подобное требование - вызвать веб-сервис, если он не работает, повторите попытку 3 раза. Если это не удается даже после стольких испытаний, отправьте уведомление по электронной почте. После того, как много погуглил, anyMatch() пришел как спаситель. Мой пример кода выглядит следующим образом. В следующем примере, если метод webServiceCall возвращает true в первой итерации, поток не повторяется дальше, как мы вызвали anyMatch(). Я считаю, что это то, что вы ищете.

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {        
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;
}
person Chinmay Phadke    schedule 24.10.2017

Если вы знаете точное количество повторений, которое будет выполнено, вы можете сделать

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);
person Dilip Tharoor    schedule 24.09.2018
comment
Хотя это может ответить на вопрос авторов, в нем отсутствуют некоторые поясняющие слова и ссылки на документацию. Фрагменты исходного кода не очень полезны без некоторых фраз. Вы также можете найти как написать хороший ответ очень полезным. Пожалуйста, отредактируйте свой ответ. - person hellow; 24.09.2018

Если у вас другая проблема, может потребоваться другое решение, но для вашей текущей проблемы я бы просто выбрал:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);
person krmanish007    schedule 15.06.2017

Возможно, это немного не по теме, но это то, что у нас есть для List<T>, а не для Stream<T>.

Сначала вам нужен take метод util. Этот метод принимает первые n элемента:

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

это работает как scala.List.take

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

теперь будет довольно просто написать takeWhile метод на основе take

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

это работает так:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

эта реализация частично повторяет список несколько раз, но не добавляет операций добавления O(n^2). Надеюсь, это приемлемо.

person Max    schedule 19.07.2018

У меня есть еще одно быстрое решение, реализовав это (что на самом деле довольно нечисто, но вы поняли идею):

public static void main(String[] args) {
    System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
            .map(o -> o.toString()).collect(Collectors.joining(", ")));
}

static interface TerminatedStream<T> {
    Stream<T> terminateOn(T e);
}

static class StreamUtil {
    static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
        return new TerminatedStream<T>() {
            public Stream<T> terminateOn(T e) {
                Builder<T> builder = Stream.<T> builder().add(seed);
                T current = seed;
                while (!current.equals(e)) {
                    current = op.apply(current);
                    builder.add(current);
                }
                return builder.build();
            }
        };
    }
}
person user2504380    schedule 08.10.2014
comment
Вы заранее оцениваете весь поток! И если current никогда .equals(e), вы получите бесконечный цикл. И то, и другое, даже если вы впоследствии примените, например, .limit(1). Это намного хуже, чем «нечисто». - person charlie; 28.06.2016

Вот моя попытка использовать только библиотеку Java Stream.

        IntStream.iterate(0, i -> i + 1)
        .filter(n -> {
                if (n < 10) {
                    System.out.println(n);
                    return false;
                } else {
                    return true;
                }
            })
        .findAny();
person climbing_bum    schedule 26.06.2015
comment
Предполагается, что предикат filter не имеет состояния. System.out.println - это побочный эффект. - person Radiodef; 05.08.2018