Являются ли преобразователи Clojure той же концепцией, что и промежуточные операции над потоками в Java?

Когда я изучал преобразователи в Clojure, меня внезапно осенило, что они мне напоминают: потоки Java 8!

Преобразователи - это составные алгоритмические преобразования. Они не зависят от контекста своих источников ввода и вывода и определяют только суть преобразования в терминах отдельного элемента.

поток не является структурой данных, в которой хранятся элементы; вместо этого он передает элементы из источника, такого как структура данных, массив, функция генератора или канал ввода-вывода, через конвейер вычислительных операций.

Clojure:

(def xf
  (comp
    (filter odd?)
    (map inc)
    (take 5)))

(println
  (transduce xf + (range 100)))  ; => 30
(println
  (into [] xf (range 100)))      ; => [2 4 6 8 10]

Джава:

// Purposely using Function and boxed primitive streams (instead of
// UnaryOperator<LongStream>) in order to keep it general.
Function<Stream<Long>, Stream<Long>> xf =
        s -> s.filter(n -> n % 2L == 1L)
                .map(n -> n + 1L)
                .limit(5L);

System.out.println(
        xf.apply(LongStream.range(0L, 100L).boxed())
                .reduce(0L, Math::addExact));    // => 30
System.out.println(
        xf.apply(LongStream.range(0L, 100L).boxed())
                .collect(Collectors.toList()));  // => [2, 4, 6, 8, 10]

За исключением разницы в статической / динамической типизации, они мне кажутся очень похожими по назначению и использованию.

Является ли аналогия с преобразованиями потоков Java разумным подходом к преобразователям? Если нет, то в чем его недостатки или чем они отличаются по концепции (не говоря уже о реализации)?


person glts    schedule 01.02.2016    source источник
comment
Моему неподготовленному глазу так кажется. А вот преобразователь у них был такой высокий, может, есть еще что-то? :)   -  person ZhongYu    schedule 01.02.2016
comment
Семантического сходства предостаточно!   -  person Frank C.    schedule 01.02.2016


Ответы (2)


Основное отличие состоит в том, что набор глаголов (операций) каким-то образом закрыт для потоков, в то время как он открыт для преобразователей: попробуйте, например, реализовать partition в потоках, это кажется немного второстепенным:

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;

public class StreamUtils {
    static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
        return Stream.of((Object) null).flatMap(x -> thunk.get());
    }

    static class Partitioner<T> implements Function<T, Stream<Stream<T>>> {
        final Function<T, ?> f;

        Object prev;
        Builder<T> sb;

        public Partitioner(Function<T, ?> f) {
            this.f = f;
        }

        public Stream<Stream<T>> apply(T t) {
            Object tag = f.apply(t);
            if (sb != null && prev.equals(tag)) {
                sb.accept(t);
                return Stream.empty();
            }
            Stream<Stream<T>> partition = sb == null ? Stream.empty() : Stream.of(sb.build());
            sb = Stream.builder();
            sb.accept(t);
            prev = tag;
            return partition;
        }

        Stream<Stream<T>> flush() {
            return sb == null ? Stream.empty() : Stream.of(sb.build());
        }
    }

    static <T> Stream<Stream<T>> partitionBy(Stream<T> in, Function<T, ?> f) {
        Partitioner<T> partitioner = new Partitioner<>(f);
        return Stream.concat(in.flatMap(partitioner), delay(() -> partitioner.flush()));
    }
}

Также, как последовательности и редукторы, когда вы преобразовываете, вы не создаете «большие» вычисления, вы создаете «больший» источник.

Чтобы иметь возможность передавать вычисления, вы ввели xf функцию из Stream в Stream, которая поднимает операции от методов до сущностей первого класса (чтобы отвязать их от источника). Таким образом вы создали преобразователь, хотя и со слишком большим интерфейсом.

Ниже приведена более общая версия приведенного выше кода для применения любого преобразователя (clojure) к потоку:

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;

import clojure.lang.AFn;
import clojure.lang.IFn;
import clojure.lang.Reduced;

public class StreamUtils {
    static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
        return Stream.of((Object) null).flatMap(x -> thunk.get());
    }

    static class Transducer implements Function {
        IFn rf;

        public Transducer(IFn xf) {
            rf = (IFn) xf.invoke(new AFn() {
                public Object invoke(Object acc) {
                    return acc;
                }

                public Object invoke(Object acc, Object item) {
                    ((Builder<Object>) acc).accept(item);
                    return acc;
                }
            });
        }

        public Stream<?> apply(Object t) {
            if (rf == null) return Stream.empty();
            Object ret = rf.invoke(Stream.builder(), t);
            if (ret instanceof Reduced) {
                Reduced red = (Reduced) ret;
                Builder<?> sb = (Builder<?>) red.deref();
                return Stream.concat(sb.build(), flush());
            }
            return ((Builder<?>) ret).build();
        }

        Stream<?> flush() {
            if (rf == null) return Stream.empty();
            Builder<?> sb = (Builder<?>) rf.invoke(Stream.builder());
            rf = null;
            return sb.build();
        }
    }

    static <T> Stream<?> withTransducer(Stream<T> in, IFn xf) {
        Transducer transducer = new Transducer(xf);
        return Stream.concat(in.flatMap(transducer), delay(() -> transducer.flush()));
    }
}
person cgrand    schedule 01.02.2016
comment
Могу я сказать еще несколько слов, сэр? - person Arthur Ulfeldt; 02.02.2016
comment
@ArthurUlfeldt Еще слов, не уверен, что так лучше :) - person cgrand; 02.02.2016
comment
Это действительно лучше! Это показывает, что они одинаковы только в тривиальных случаях, когда редуцирующего контекста мало или нет. Я подозреваю, что это то, о чем просил glts. - person Arthur Ulfeldt; 04.02.2016

Еще одно важное отличие, которое я вижу, заключается в том, что преобразователи Clojure компонуемы. У меня часто бывает ситуация, когда мои потоковые конвейеры немного длиннее, чем в вашем примере, где есть только некоторые промежуточные шаги, которые я мог бы повторно использовать в другом месте, например:

someStream
   .map(...)
   .filter(...)
   .map(...)      // <- gee, there are at least two other
   .filter(...)   // <- pipelines where I could use the functionality
   .map(...)      // <- of just these three steps!
   .filter(...)
   .collect(...)

Я не нашел разумного способа добиться этого. Я хотел бы иметь что-то вроде этого:

Transducer<Integer,String> smallTransducer = s -> s.map(...); // usable in a stream Integer -> String
Transducer<String,MyClass> otherTransducer = s -> s.filter(...).map(...); // stream String -> MyClass
Transducer<Integer,MyClass> combinedTransducer = smallTransducer.then(otherTransducer); // compose transducers, to get an Integer -> MyClass transducer

а затем используйте это так:

someStream
   .map(...)
   .filter(...)
   .transduce(smallTransducer)
   .transduce(otherTransducer)
   .filter(...)
   .collect(...)

// or

someStream
   .map(...)
   .filter(...)
   .transduce(combinedTransducer)
   .filter(...)
   .collect(...)
person ThomasH    schedule 30.10.2019
comment
По крайней мере, похоже, что существует (неподдерживаемая?) Библиотека Java, которая идет в этом направлении: github.com /ognitect-labs / transducers-java - person ThomasH; 30.10.2019