Получаване на следващия елемент от поток на Java 8

Бих искал да извлека и премахна следващия елемент от Java 8 Stream, без това Stream да се затвори.

Stream<Integer> integerStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Integer zero = integerStream.getNext(); // 0
Integer one  = integerStream.getNext(); // 1
...

Възможно ли е това?


person Abdull    schedule 27.10.2014    source източник
comment
Искате ли наистина да използвате премахнатите елементи или просто да ги изхвърлите? За последното .skip(n) е начинът да го направите.   -  person David Conrad    schedule 27.10.2014


Отговори (3)


Да, има начин да направите това, но с някои ограничения.

Stream<Integer> infiniteStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Iterator<Integer> iter = infiniteStream.iterator();
Integer zero = iter.next();
Integer one  = iter.next();

Алтернативно,

Stream<Integer> infiniteStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Spliterator<Integer> spliterator = infiniteStream.spliterator();
spliterator.tryAdvance(i -> System.out.println(i)); // zero
spliterator.tryAdvance(i -> System.out.println(i)); // one

При даден Stream е възможно да получите Iterator или Spliterator от него или да направите заявка дали е паралелен поток и т.н. Те са дефинирани на BaseStream интерфейс, суперинтерфейс на Stream, което ги прави малко лесни за пропускане.

В този случай знаем, че потокът е безкраен, така че няма нужда да извикваме hasNext() метода на Iterator или да проверяваме върнатата стойност на tryAdvance() на Spliterator

Ограничението е, че и двата метода iterator() и spliterator() на Stream са терминални операции, което означава, че след като бъдат извикани, върнатият итератор или сплитератор има изключителен достъп до стойностите, представени от потока. Допълнителни операции в потока (като filter или map и т.н.) не са разрешени и ще бъдат посрещнати с IllegalStateException.

Ако искате да отлепите първите няколко елемента и след това да възобновите обработката на потока, можете да превърнете сплитератора обратно в поток по следния начин:

Stream<Integer> stream2 = StreamSupport.stream(spliterator, false);

Това вероятно ще работи добре за някои неща, но не съм сигурен, че бих препоръчал тази техника като цяло. Мисля, че добавя няколко допълнителни обекта и по този начин допълнителни извиквания на метод в пътя за създаване на следващия елемент.

Редакционни коментари (несвързани с вашия въпрос):

  • Не използвайте new Integer(val). Вместо това използвайте Integer.valueOf(val), което ще използва отново поставеното в кутия цяло число, ако е налично, което обикновено е вярно за стойности в диапазона от -128 до 127.
  • Можете да използвате IntStream вместо Stream<Integer>, което напълно избягва режийните полета. Той няма пълния набор от поточни операции, но има iterate(), който приема функция, която работи с примитивни int стойности.
person Stuart Marks    schedule 27.10.2014
comment
Или можете просто да използвате x -> x + 1, което е по-кратко и също използва Integer.valueOf (проверено току-що с 8u25 и javap -c -p). - person David Conrad; 27.10.2014

Въз основа на отговора на Стюарт и с Преобразуване от итератор към поток, измислих следния бърз и мръсен клас обвивка. Не е тестван и не е безопасен за нишки, но ми осигурява това, от което се нуждая в момента, за премахване и използване на отделни елементи, като същевременно поддържа този поток „отворен“.

PeelingStream<T> предоставя метод T getNext(), който защитава семантиката на операцията на терминалния поток на someWrappedStream.iterator():

public class PeelingStream<T> implements Stream<T> {

    private Stream<T> wrapped;

    public PeelingStream(Stream<T> toBeWrapped) {
        this.wrapped = toBeWrapped;
    }

    public T getNext() {
        Iterator<T> iterator = wrapped.iterator();
        T next = iterator.next();
        Iterable<T> remainingIterable = () -> iterator;
        wrapped = StreamSupport.stream(remainingIterable.spliterator(),
                false);

        return next;
    }

    ///////////////////// from here, only plain delegate methods

    public Iterator<T> iterator() {
        return wrapped.iterator();
    }

    public Spliterator<T> spliterator() {
        return wrapped.spliterator();
    }

    public boolean isParallel() {
        return wrapped.isParallel();
    }

    public Stream<T> sequential() {
        return wrapped.sequential();
    }

    public Stream<T> parallel() {
        return wrapped.parallel();
    }

    public Stream<T> unordered() {
        return wrapped.unordered();
    }

    public Stream<T> onClose(Runnable closeHandler) {
        return wrapped.onClose(closeHandler);

    }

    public void close() {
        wrapped.close();
    }

    public Stream<T> filter(Predicate<? super T> predicate) {
        return wrapped.filter(predicate);
    }

    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
        return wrapped.map(mapper);
    }

    public IntStream mapToInt(ToIntFunction<? super T> mapper) {
        return wrapped.mapToInt(mapper);
    }

    public LongStream mapToLong(ToLongFunction<? super T> mapper) {
        return wrapped.mapToLong(mapper);
    }

    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
        return wrapped.mapToDouble(mapper);
    }

    public <R> Stream<R> flatMap(
            Function<? super T, ? extends Stream<? extends R>> mapper) {
        return wrapped.flatMap(mapper);
    }

    public IntStream flatMapToInt(
            Function<? super T, ? extends IntStream> mapper) {
        return wrapped.flatMapToInt(mapper);
    }

    public LongStream flatMapToLong(
            Function<? super T, ? extends LongStream> mapper) {
        return wrapped.flatMapToLong(mapper);
    }

    public DoubleStream flatMapToDouble(
            Function<? super T, ? extends DoubleStream> mapper) {
        return wrapped.flatMapToDouble(mapper);
    }

    public Stream<T> distinct() {
        return wrapped.distinct();
    }

    public Stream<T> sorted() {
        return wrapped.sorted();
    }

    public Stream<T> sorted(Comparator<? super T> comparator) {
        return wrapped.sorted(comparator);
    }

    public Stream<T> peek(Consumer<? super T> action) {
        return wrapped.peek(action);
    }

    public Stream<T> limit(long maxSize) {
        return wrapped.limit(maxSize);
    }

    public Stream<T> skip(long n) {
        return wrapped.skip(n);
    }

    public void forEach(Consumer<? super T> action) {
        wrapped.forEach(action);
    }

    public void forEachOrdered(Consumer<? super T> action) {
        wrapped.forEachOrdered(action);
    }

    public Object[] toArray() {
        return wrapped.toArray();
    }

    public <A> A[] toArray(IntFunction<A[]> generator) {
        return wrapped.toArray(generator);
    }

    public T reduce(T identity, BinaryOperator<T> accumulator) {
        return wrapped.reduce(identity, accumulator);
    }

    public Optional<T> reduce(BinaryOperator<T> accumulator) {
        return wrapped.reduce(accumulator);
    }

    public <U> U reduce(U identity,
            BiFunction<U, ? super T, U> accumulator,
            BinaryOperator<U> combiner) {
        return wrapped.reduce(identity, accumulator, combiner);
    }

    public <R> R collect(Supplier<R> supplier,
            BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return wrapped.collect(supplier, accumulator, combiner);
    }

    public <R, A> R collect(Collector<? super T, A, R> collector) {
        return wrapped.collect(collector);
    }

    public Optional<T> min(Comparator<? super T> comparator) {
        return wrapped.min(comparator);
    }

    public Optional<T> max(Comparator<? super T> comparator) {
        return wrapped.max(comparator);
    }

    public long count() {
        return wrapped.count();
    }

    public boolean anyMatch(Predicate<? super T> predicate) {
        return wrapped.anyMatch(predicate);
    }

    public boolean allMatch(Predicate<? super T> predicate) {
        return wrapped.allMatch(predicate);
    }

    public boolean noneMatch(Predicate<? super T> predicate) {
        return wrapped.noneMatch(predicate);
    }

    public Optional<T> findFirst() {
        return wrapped.findFirst();
    }

    public Optional<T> findAny() {
        return wrapped.findAny();
    }

}

Малък тест:

@Test
public void testPeelingOffItemsFromStream() {

    Stream<Integer> infiniteStream = Stream.iterate(0, x -> x + 1);

    PeelingStream<Integer> peelingInfiniteStream = new PeelingStream<>(infiniteStream);

    Integer one = peelingInfiniteStream.getNext();
    assertThat(one, equalTo(0));

    Integer two = peelingInfiniteStream.getNext();
    assertThat(two, equalTo(1));

    Stream<Integer> limitedStream = peelingInfiniteStream.limit(3); // 2 3 4
    int sumOf234 = limitedStream.mapToInt(x -> x.intValue()).sum();
    assertThat(sumOf234, equalTo(2 + 3 + 4));

}
person Abdull    schedule 28.10.2014

Направих следното. Оригиналният поток се затваря, но се създава нов поток, който се държи точно като стария поток.

Ще ви трябват com.google.common.collect.Iterators от guava.

import static com.google.common.collect.Iterators.concat;
import static com.google.common.collect.Iterators.singletonIterator;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.StreamSupport.stream;

private <T> Stream<T> peekFirst(Stream<T> originalStream){
    //This closes the original Stream
    Iterator<T> originalIterator = originalStream.iterator();
    if (!originalIterator.hasNext()) {
        return Stream.of();
    }
    T firstElement = originalIterator.next();
    doSomethingWithFirstElement(firstElement);
    Iterator<T> newIterator = concat(
      singletonIterator(firstElement),
      originalIterator);
    return stream(
      spliteratorUnknownSize(newIterator, 0), 
        originalStream.isParallel());
}
person radiantRazor    schedule 07.06.2021