Почему в Scala и таких фреймворках, как Spark и Scalding, есть и reduce
, и foldLeft
? Так в чем же разница между reduce
и fold
?
Разница между reduce и foldLeft / fold в функциональном программировании (особенно в API Scala и Scala)?
Ответы (4)
уменьшить vs foldLeft
Большое большое различие, не упомянутое ни в одном другом ответе на переполнение стека, относящемся к этой теме, заключается в том, что reduce
должен быть задан коммутативный моноид, то есть операция, которая является как коммутативной, так и ассоциативной. Это означает, что операцию можно распараллелить.
Это различие очень важно для больших данных / MPP / распределенных вычислений и является единственной причиной, по которой reduce
вообще существует. Коллекция может быть разделена на части, и reduce
может работать с каждым фрагментом, тогда reduce
может работать с результатами каждого фрагмента - фактически, уровень фрагментации не должен останавливаться на один уровень глубже. Мы тоже можем нарезать каждый кусок. Вот почему суммирование целых чисел в списке составляет O (log N), если дано бесконечное количество процессоров.
Если вы просто посмотрите на подписи, у reduce
нет причин для существования, потому что вы можете достичь всего, что можете с reduce
, с foldLeft
. Функциональность foldLeft
больше, чем функциональность reduce
.
Но вы не можете распараллелить foldLeft
, поэтому время его выполнения всегда равно O (N) (даже если вы используете коммутативный моноид). Это связано с тем, что предполагается, что операция не является коммутативным моноидом, и поэтому накопленное значение будет вычисляться посредством серии последовательных агрегатов.
foldLeft
не предполагает коммутативности или ассоциативности. Это ассоциативность, которая дает возможность разбить коллекцию, и ее коммутативность, которая упрощает накопление, потому что порядок не важен (поэтому не имеет значения, в каком порядке агрегировать каждый из результатов из каждого из фрагментов). Строго говоря, коммутативность не требуется для распараллеливания, например для алгоритмов распределенной сортировки, она просто упрощает логику, потому что вам не нужно упорядочивать свои фрагменты.
Если вы посмотрите документацию Spark для reduce
, там конкретно написано «... коммутативный и ассоциативный бинарный оператор»
http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
Вот доказательство того, что reduce
НЕ является частным случаем foldLeft
scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par
scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds
scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds
уменьшить vs свернуть
Теперь это то, где он становится немного ближе к математическим корням FP /, и его немного сложнее объяснить. Reduce формально определяется как часть парадигмы MapReduce, которая имеет дело с беспорядочными коллекциями (мультимножествами), Fold формально определяется в терминах рекурсии (см. Катаморфизм) и, таким образом, предполагает структуру / последовательность для коллекций.
В Scalding нет fold
метода, потому что в рамках (строгой) модели программирования Map Reduce мы не можем определить fold
, потому что чанки не имеют упорядочения, а fold
требует только ассоциативности, а не коммутативности.
Проще говоря, reduce
работает без порядка кумуляции, fold
требует порядка кумуляции, и именно этот порядок кумуляции требует нулевого значения, НЕ наличия нулевого значения, которое их отличает. Строго говоря, reduce
должен работать с пустой коллекцией, потому что его нулевое значение можно вывести, взяв произвольное значение x
и затем решив x op y = x
, но это не работает с некоммутативной операцией, поскольку может существуют левое и правое нулевые значения, которые различны (т. е. x op y != y op x
). Конечно, Scala не пытается выяснить, что это за нулевое значение, поскольку для этого потребуется выполнить некоторую математику (которая, вероятно, невычислима), поэтому просто выдает исключение.
Кажется (как это часто бывает в этимологии), этот первоначальный математический смысл был утерян, поскольку единственное очевидное различие в программировании - это подпись. В результате reduce
стал синонимом fold
, вместо того, чтобы сохранить его первоначальное значение из MapReduce. Теперь эти термины часто используются как взаимозаменяемые и ведут себя одинаково в большинстве реализаций (игнорируя пустые коллекции). Странность усугубляется особенностями, такими как в Spark, которые мы сейчас рассмотрим.
Итак, Spark имеет fold
, но порядок, в котором объединяются подрезультаты (по одному для каждого раздела) (на момент написания), совпадает с порядком выполнения задач - и, следовательно, не- детерминированный. Спасибо @CafeFeed за указание, что fold
использует runJob
, и после прочтения кода я понял, что он недетерминирован. Дальнейшая путаница возникает из-за того, что Spark имеет treeReduce
, но не treeFold
.
Заключение
Есть разница между reduce
и fold
, даже когда они применяются к непустым последовательностям. Первый определяется как часть парадигмы программирования MapReduce для коллекций с произвольным порядком (http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf), и следует предположить, что операторы не только ассоциативны, но и коммутативны, чтобы давать детерминированные результаты. Последний определяется в терминах катоморфизмов и требует, чтобы коллекции имели понятие последовательности (или определялись рекурсивно, как связанные списки), поэтому не требуют коммутативных операторов.
На практике из-за нематематической природы программирования reduce
и fold
, как правило, ведут себя одинаково, либо правильно (как в Scala), либо неправильно (как в Spark).
Дополнительно: мое мнение о Spark API
Я считаю, что путаницы можно избежать, если полностью отказаться от термина fold
в Spark. По крайней мере, у Spark есть примечание в их документации:
Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций на функциональных языках, таких как Scala.
foldLeft
содержит Left
в своем имени и почему существует также метод, называемый fold
.
- person kiritsuku; 06.08.2014
fold
не существует, скажем, в Scalding, потому что, в отличие от reduce
, он не требует коммутативности. Я обновил свой ответ, чтобы объяснить это. По сути, я пытаюсь подчеркнуть, что разница между fold*
и reduce*
очень сильно связана с корнями FP в теории категорий.
- person samthebest; 06.08.2014
.par
, поэтому (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
я получаю каждый раз разные результаты.
- person samthebest; 07.08.2014
fold
на RDD в точности совпадает с контрактом Scala ParSeq.fold
. Хотя ParSeqs
обычно (всегда?) Следят за порядком, это на самом деле не является частью контракта. Можно легко реализовать fold
(или не так просто foldByKey
), который гарантирует порядок слияния, и основная причина сделать это так, как это делается сейчас, - это производительность / гибкость планирования.
- person zero323; 07.06.2016
reduce
, reduceByKey
, fold
, foldByKey
, aggregate
, aggregateByKey
, combineByKey
) должны передавать функции, которые в конце концов являются ассоциативными и коммутативными, верно?
- person Make42; 08.06.2016
reallyFold
сутенера, например: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)
, для этого f не понадобится.
- person samthebest; 08.06.2016
(index, res) => results(index) = res
как обработчик результата, поэтому обработчик результата использует индекс раздела для помещения результата в Array
- person samthebest; 09.06.2016
List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par.reduce(_+_)
, я думаю, он должен давать мне случайный результат, но я получаю каждый раз один и тот же результат.
- person altayseyhan; 16.02.2017
(List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
каждый раз дает разные результаты, связанные с неассоциативной операцией деления, я имею в виду не связанной с некоммутативностью [docs.scala-lang.org/overviews/parallel-collections/, пожалуйста, посмотрите на строку, начинающуюся с Примечание. Часто считается что..
- person altayseyhan; 16.02.2017
Если я не ошибаюсь, даже если Spark API этого не требует, fold также требует, чтобы f был коммутативным. Потому что порядок, в котором будут агрегированы разделы, не гарантирован. Например, в следующем коде сортируется только первая распечатка:
import org.apache.spark.{SparkConf, SparkContext}
object FoldExample extends App{
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Simple Application")
implicit val sc = new SparkContext(conf)
val range = ('a' to 'z').map(_.toString)
val rdd = sc.parallelize(range)
println(range.reduce(_ + _))
println(rdd.reduce(_ + _))
println(rdd.fold("")(_ + _))
}
Распечатка:
АБВГДЕЖЗИЙКЛМНОПРСТУФХЦЧШЩЫЭЮЯ
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)
с 2+ ядрами несколько раз, я думаю, вы увидите, что он производит случайный (по разделам) порядок. Я соответствующим образом обновил свой ответ.
- person samthebest; 08.06.2016
fold
в Apache Spark - это не то же самое, что fold
в нераспространяемых коллекциях. Фактически для получения детерминированных результатов требуется коммутативная функция:
Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций на функциональных языках, таких как Scala. Эту операцию сворачивания можно применять к разделам по отдельности, а затем сворачивать эти результаты в окончательный результат, а не применять сворачивание к каждому элементу последовательно в некотором определенном порядке. Для некоммутативных функций результат может отличаться от результата свертки, примененной к нераспределенной коллекции.
Этот был показан Мишель Розенталь и предложено Make42 в его комментарий.
Было предложено, что наблюдаемое поведение относится к HashPartitioner
, хотя на самом деле parallelize
не перемешивается и не использует HashPartitioner
.
import org.apache.spark.sql.SparkSession
/* Note: standalone (non-local) mode */
val master = "spark://...:7077"
val spark = SparkSession.builder.master(master).getOrCreate()
/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })
/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
Разъяснил:
Структура fold
для RDD
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
var jobResult: T
val cleanOp: (T, T) => T
val foldPartition = Iterator[T] => T
val mergeResult: (Int, T) => Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
то же самое как структура reduce
для СДР:
def reduce(f: (T, T) => T): T = withScope {
val cleanF: (T, T) => T
val reducePartition: Iterator[T] => Option[T]
var jobResult: Option[T]
val mergeResult = (Int, Option[T]) => Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
где runJob
выполняется без учета порядка разбиения и требует коммутативной функции.
foldPartition
и reducePartition
эквивалентны с точки зрения порядка обработки и эффективно (путем наследования и делегирования) реализованы _ 14_ и _ 15_ на _ 16_.
Вывод: fold
в RDD не может зависеть от порядка фрагментов и требует коммутативности и ассоциативности.
fold
на RDD
s действительно действительно то же самое, что и reduce
, но при этом не учитываются основные математические различия (я обновил свой ответ, чтобы он был еще более ясным). Хотя я не согласен с тем, что нам действительно нужна коммутативность, при условии, что каждый уверен, что бы ни делал его разделитель, он сохраняет порядок.
- person samthebest; 06.06.2016
runJob
, я вижу, что действительно он выполняет объединение в соответствии с тем, когда задача завершена, а НЕ по порядку разделов. Именно эта ключевая деталь заставляет все встать на свои места. Я отредактировал свой ответ еще раз и тем самым исправил указанную вами ошибку. Пожалуйста, не могли бы вы удалить свою награду, раз уж мы пришли к соглашению?
- person samthebest; 07.06.2016
Еще одно отличие Scalding - это использование в Hadoop объединителей.
Представьте, что ваша операция является коммутативным моноидом, с reduce она также будет применяться на стороне карты вместо перетасовки / сортировки всех данных в редукторы. С foldLeft дело обстоит иначе.
pipe.groupBy('product) {
_.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
// reduce is .mapReduceMap in disguise
}
pipe.groupBy('product) {
_.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}
Всегда полезно определять свои операции как моноид в Scalding.