Получение следующего элемента из потока Java 8

Я хотел бы получить и удалить следующий элемент из Java 8 Stream, не закрывая этот Stream.

Stream<Integer> integerStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Integer zero = integerStream.getNext(); // 0
Integer one  = integerStream.getNext(); // 1
...

Это возможно?


person Abdull    schedule 27.10.2014    source источник
comment
Вы действительно хотите использовать удаленные элементы или просто выбросить их? Для последнего .skip(n) это способ сделать это.   -  person David Conrad    schedule 27.10.2014


Ответы (3)


Да, есть способ сделать это, но с некоторыми ограничениями.

Stream<Integer> infiniteStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Iterator<Integer> iter = infiniteStream.iterator();
Integer zero = iter.next();
Integer one  = iter.next();

Альтернативно,

Stream<Integer> infiniteStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Spliterator<Integer> spliterator = infiniteStream.spliterator();
spliterator.tryAdvance(i -> System.out.println(i)); // zero
spliterator.tryAdvance(i -> System.out.println(i)); // one

Имея Stream, можно получить из него Iterator или Spliterator, или запросить, является ли это параллельным потоком и т. д. Они определены в BaseStream, суперинтерфейс Stream, из-за чего их легко не заметить.

В этом случае мы знаем, что поток бесконечен, поэтому нет необходимости вызывать метод hasNext() Итератора или проверять возвращаемое значение tryAdvance() Разделителя.

Ограничение состоит в том, что оба метода iterator() и spliterator() Stream являются терминальными операциями, что означает, что после их вызова возвращаемый Iterator или Spliterator имеет монопольный доступ к значениям, представленным Stream. Дальнейшие операции с потоком (такие как filter или map и т. д.) не разрешены и будут встречаться с IllegalStateException.

Если вы хотите отделить первую пару элементов, а затем возобновить обработку потока, вы можете превратить разделитель обратно в поток следующим образом:

Stream<Integer> stream2 = StreamSupport.stream(spliterator, false);

Это, вероятно, будет хорошо работать для некоторых вещей, но я не уверен, что рекомендую эту технику в целом. Я думаю, что это добавляет несколько дополнительных объектов и, следовательно, дополнительные вызовы методов на пути создания следующего элемента.

Комментарии редакции (не относящиеся к вашему вопросу):

  • Не используйте new Integer(val). Вместо этого используйте Integer.valueOf(val), который будет повторно использовать упакованное целое число, если оно доступно, что обычно верно для значений в диапазоне от -128 до 127.
  • Вы можете использовать IntStream вместо Stream<Integer>, что полностью исключает накладные расходы на упаковку. У него нет полного набора потоковых операций, но есть iterate(), который принимает функцию, работающую с примитивными значениями int.
person Stuart Marks    schedule 27.10.2014
comment
Или вы можете просто использовать x -> x + 1, который короче, а также использует Integer.valueOf (только что проверено с 8u25 и javap -c -p). - person David Conrad; 27.10.2014

На основе ответа Стюарта и преобразование итератора в поток, я придумал следующий класс-оболочку. Он не тестировался и не является потокобезопасным, но он дает мне то, что мне сейчас нужно: удаление и использование отдельных элементов, сохраняя при этом этот поток "открытым".

PeelingStream<T> предоставляет метод T getNext(), который скрывает семантику someWrappedStream.iterator() операции терминального потока:

public class PeelingStream<T> implements Stream<T> {

    private Stream<T> wrapped;

    public PeelingStream(Stream<T> toBeWrapped) {
        this.wrapped = toBeWrapped;
    }

    public T getNext() {
        Iterator<T> iterator = wrapped.iterator();
        T next = iterator.next();
        Iterable<T> remainingIterable = () -> iterator;
        wrapped = StreamSupport.stream(remainingIterable.spliterator(),
                false);

        return next;
    }

    ///////////////////// from here, only plain delegate methods

    public Iterator<T> iterator() {
        return wrapped.iterator();
    }

    public Spliterator<T> spliterator() {
        return wrapped.spliterator();
    }

    public boolean isParallel() {
        return wrapped.isParallel();
    }

    public Stream<T> sequential() {
        return wrapped.sequential();
    }

    public Stream<T> parallel() {
        return wrapped.parallel();
    }

    public Stream<T> unordered() {
        return wrapped.unordered();
    }

    public Stream<T> onClose(Runnable closeHandler) {
        return wrapped.onClose(closeHandler);

    }

    public void close() {
        wrapped.close();
    }

    public Stream<T> filter(Predicate<? super T> predicate) {
        return wrapped.filter(predicate);
    }

    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
        return wrapped.map(mapper);
    }

    public IntStream mapToInt(ToIntFunction<? super T> mapper) {
        return wrapped.mapToInt(mapper);
    }

    public LongStream mapToLong(ToLongFunction<? super T> mapper) {
        return wrapped.mapToLong(mapper);
    }

    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
        return wrapped.mapToDouble(mapper);
    }

    public <R> Stream<R> flatMap(
            Function<? super T, ? extends Stream<? extends R>> mapper) {
        return wrapped.flatMap(mapper);
    }

    public IntStream flatMapToInt(
            Function<? super T, ? extends IntStream> mapper) {
        return wrapped.flatMapToInt(mapper);
    }

    public LongStream flatMapToLong(
            Function<? super T, ? extends LongStream> mapper) {
        return wrapped.flatMapToLong(mapper);
    }

    public DoubleStream flatMapToDouble(
            Function<? super T, ? extends DoubleStream> mapper) {
        return wrapped.flatMapToDouble(mapper);
    }

    public Stream<T> distinct() {
        return wrapped.distinct();
    }

    public Stream<T> sorted() {
        return wrapped.sorted();
    }

    public Stream<T> sorted(Comparator<? super T> comparator) {
        return wrapped.sorted(comparator);
    }

    public Stream<T> peek(Consumer<? super T> action) {
        return wrapped.peek(action);
    }

    public Stream<T> limit(long maxSize) {
        return wrapped.limit(maxSize);
    }

    public Stream<T> skip(long n) {
        return wrapped.skip(n);
    }

    public void forEach(Consumer<? super T> action) {
        wrapped.forEach(action);
    }

    public void forEachOrdered(Consumer<? super T> action) {
        wrapped.forEachOrdered(action);
    }

    public Object[] toArray() {
        return wrapped.toArray();
    }

    public <A> A[] toArray(IntFunction<A[]> generator) {
        return wrapped.toArray(generator);
    }

    public T reduce(T identity, BinaryOperator<T> accumulator) {
        return wrapped.reduce(identity, accumulator);
    }

    public Optional<T> reduce(BinaryOperator<T> accumulator) {
        return wrapped.reduce(accumulator);
    }

    public <U> U reduce(U identity,
            BiFunction<U, ? super T, U> accumulator,
            BinaryOperator<U> combiner) {
        return wrapped.reduce(identity, accumulator, combiner);
    }

    public <R> R collect(Supplier<R> supplier,
            BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return wrapped.collect(supplier, accumulator, combiner);
    }

    public <R, A> R collect(Collector<? super T, A, R> collector) {
        return wrapped.collect(collector);
    }

    public Optional<T> min(Comparator<? super T> comparator) {
        return wrapped.min(comparator);
    }

    public Optional<T> max(Comparator<? super T> comparator) {
        return wrapped.max(comparator);
    }

    public long count() {
        return wrapped.count();
    }

    public boolean anyMatch(Predicate<? super T> predicate) {
        return wrapped.anyMatch(predicate);
    }

    public boolean allMatch(Predicate<? super T> predicate) {
        return wrapped.allMatch(predicate);
    }

    public boolean noneMatch(Predicate<? super T> predicate) {
        return wrapped.noneMatch(predicate);
    }

    public Optional<T> findFirst() {
        return wrapped.findFirst();
    }

    public Optional<T> findAny() {
        return wrapped.findAny();
    }

}

Небольшой тест:

@Test
public void testPeelingOffItemsFromStream() {

    Stream<Integer> infiniteStream = Stream.iterate(0, x -> x + 1);

    PeelingStream<Integer> peelingInfiniteStream = new PeelingStream<>(infiniteStream);

    Integer one = peelingInfiniteStream.getNext();
    assertThat(one, equalTo(0));

    Integer two = peelingInfiniteStream.getNext();
    assertThat(two, equalTo(1));

    Stream<Integer> limitedStream = peelingInfiniteStream.limit(3); // 2 3 4
    int sumOf234 = limitedStream.mapToInt(x -> x.intValue()).sum();
    assertThat(sumOf234, equalTo(2 + 3 + 4));

}
person Abdull    schedule 28.10.2014

Я сделал следующее. Исходный поток закрывается, но создается новый поток, который ведет себя точно так же, как старый поток.

Вам понадобятся com.google.common.collect.Iterators из гуавы.

import static com.google.common.collect.Iterators.concat;
import static com.google.common.collect.Iterators.singletonIterator;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.StreamSupport.stream;

private <T> Stream<T> peekFirst(Stream<T> originalStream){
    //This closes the original Stream
    Iterator<T> originalIterator = originalStream.iterator();
    if (!originalIterator.hasNext()) {
        return Stream.of();
    }
    T firstElement = originalIterator.next();
    doSomethingWithFirstElement(firstElement);
    Iterator<T> newIterator = concat(
      singletonIterator(firstElement),
      originalIterator);
    return stream(
      spliteratorUnknownSize(newIterator, 0), 
        originalStream.isParallel());
}
person radiantRazor    schedule 07.06.2021