Разница между reduce и foldLeft / fold в функциональном программировании (особенно в API Scala и Scala)?

Почему в Scala и таких фреймворках, как Spark и Scalding, есть и reduce, и foldLeft? Так в чем же разница между reduce и fold?


person samthebest    schedule 06.08.2014    source источник
comment
См. Также: stackoverflow.com/questions/16111440/scala -fold-vs-foldleft /   -  person axel22    schedule 06.08.2014
comment
Также разница между свертыванием и сокращением исправленных   -  person Alper t. Turker    schedule 11.05.2018


Ответы (4)


уменьшить vs foldLeft

Большое большое различие, не упомянутое ни в одном другом ответе на переполнение стека, относящемся к этой теме, заключается в том, что reduce должен быть задан коммутативный моноид, то есть операция, которая является как коммутативной, так и ассоциативной. Это означает, что операцию можно распараллелить.

Это различие очень важно для больших данных / MPP / распределенных вычислений и является единственной причиной, по которой reduce вообще существует. Коллекция может быть разделена на части, и reduce может работать с каждым фрагментом, тогда reduce может работать с результатами каждого фрагмента - фактически, уровень фрагментации не должен останавливаться на один уровень глубже. Мы тоже можем нарезать каждый кусок. Вот почему суммирование целых чисел в списке составляет O (log N), если дано бесконечное количество процессоров.

Если вы просто посмотрите на подписи, у reduce нет причин для существования, потому что вы можете достичь всего, что можете с reduce, с foldLeft. Функциональность foldLeft больше, чем функциональность reduce.

Но вы не можете распараллелить foldLeft, поэтому время его выполнения всегда равно O (N) (даже если вы используете коммутативный моноид). Это связано с тем, что предполагается, что операция не является коммутативным моноидом, и поэтому накопленное значение будет вычисляться посредством серии последовательных агрегатов.

foldLeft не предполагает коммутативности или ассоциативности. Это ассоциативность, которая дает возможность разбить коллекцию, и ее коммутативность, которая упрощает накопление, потому что порядок не важен (поэтому не имеет значения, в каком порядке агрегировать каждый из результатов из каждого из фрагментов). Строго говоря, коммутативность не требуется для распараллеливания, например для алгоритмов распределенной сортировки, она просто упрощает логику, потому что вам не нужно упорядочивать свои фрагменты.

Если вы посмотрите документацию Spark для reduce, там конкретно написано «... коммутативный и ассоциативный бинарный оператор»

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

Вот доказательство того, что reduce НЕ является частным случаем foldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

уменьшить vs свернуть

Теперь это то, где он становится немного ближе к математическим корням FP /, и его немного сложнее объяснить. Reduce формально определяется как часть парадигмы MapReduce, которая имеет дело с беспорядочными коллекциями (мультимножествами), Fold формально определяется в терминах рекурсии (см. Катаморфизм) и, таким образом, предполагает структуру / последовательность для коллекций.

В Scalding нет fold метода, потому что в рамках (строгой) модели программирования Map Reduce мы не можем определить fold, потому что чанки не имеют упорядочения, а fold требует только ассоциативности, а не коммутативности.

Проще говоря, reduce работает без порядка кумуляции, fold требует порядка кумуляции, и именно этот порядок кумуляции требует нулевого значения, НЕ наличия нулевого значения, которое их отличает. Строго говоря, reduce должен работать с пустой коллекцией, потому что его нулевое значение можно вывести, взяв произвольное значение x и затем решив x op y = x, но это не работает с некоммутативной операцией, поскольку может существуют левое и правое нулевые значения, которые различны (т. е. x op y != y op x). Конечно, Scala не пытается выяснить, что это за нулевое значение, поскольку для этого потребуется выполнить некоторую математику (которая, вероятно, невычислима), поэтому просто выдает исключение.

Кажется (как это часто бывает в этимологии), этот первоначальный математический смысл был утерян, поскольку единственное очевидное различие в программировании - это подпись. В результате reduce стал синонимом fold, вместо того, чтобы сохранить его первоначальное значение из MapReduce. Теперь эти термины часто используются как взаимозаменяемые и ведут себя одинаково в большинстве реализаций (игнорируя пустые коллекции). Странность усугубляется особенностями, такими как в Spark, которые мы сейчас рассмотрим.

Итак, Spark имеет fold, но порядок, в котором объединяются подрезультаты (по одному для каждого раздела) (на момент написания), совпадает с порядком выполнения задач - и, следовательно, не- детерминированный. Спасибо @CafeFeed за указание, что fold использует runJob, и после прочтения кода я понял, что он недетерминирован. Дальнейшая путаница возникает из-за того, что Spark имеет treeReduce, но не treeFold.

Заключение

Есть разница между reduce и fold, даже когда они применяются к непустым последовательностям. Первый определяется как часть парадигмы программирования MapReduce для коллекций с произвольным порядком (http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf), и следует предположить, что операторы не только ассоциативны, но и коммутативны, чтобы давать детерминированные результаты. Последний определяется в терминах катоморфизмов и требует, чтобы коллекции имели понятие последовательности (или определялись рекурсивно, как связанные списки), поэтому не требуют коммутативных операторов.

На практике из-за нематематической природы программирования reduce и fold, как правило, ведут себя одинаково, либо правильно (как в Scala), либо неправильно (как в Spark).

Дополнительно: мое мнение о Spark API

Я считаю, что путаницы можно избежать, если полностью отказаться от термина fold в Spark. По крайней мере, у Spark есть примечание в их документации:

Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций на функциональных языках, таких как Scala.

person samthebest    schedule 06.08.2014
comment
Вот почему foldLeft содержит Left в своем имени и почему существует также метод, называемый fold. - person kiritsuku; 06.08.2014
comment
@sschaef верно, но fold не существует, скажем, в Scalding, потому что, в отличие от reduce, он не требует коммутативности. Я обновил свой ответ, чтобы объяснить это. По сути, я пытаюсь подчеркнуть, что разница между fold* и reduce* очень сильно связана с корнями FP в теории категорий. - person samthebest; 06.08.2014
comment
У вас есть пример для различных (например, x op y! = Y op x) с помощью сокращения и свертывания, чтобы увидеть разницу. Я не думаю, что это правильное утверждение для сокращения. Например, для сокращения я использую некоммутативную операцию деления. (List (1000000.0) ::: List.tabulate (100) (_ + 0.001)). Reduce (_ / _) Для оператора (т.е. x op y! = Y op x). он должен давать мне случайный результат, но он дает мне каждый раз один и тот же результат, поэтому я думаю, что сокращение все еще сохраняет порядок кумуляции - person Xiaohe Dong; 07.08.2014
comment
@Cloudtech Это совпадение с однопоточной реализацией, не входящей в спецификацию. На моем 4-ядерном компьютере, если я попытаюсь добавить .par, поэтому (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _) я получаю каждый раз разные результаты. - person samthebest; 07.08.2014
comment
Отличный ответ. Я думаю, что сокращение должно иметь коммутативный моноид, то есть операция, которая является как коммутативной, так и ассоциативной, должна читаться следующим образом: reduce должна иметь коммутативную полугруппу, то есть структуру данных, которая поддерживает операцию комбинирования, которая является как коммутативной, так и ассоциативной. Я не верю, что для сокращения нужен элемент идентичности. - person Alex Dean; 24.11.2014
comment
@AlexDean в контексте информатики, нет, на самом деле ему не нужна личность, поскольку пустые коллекции, как правило, просто генерируют исключения. Но математически более элегантно (и было бы более элегантно, если бы это делали коллекции), если элемент идентичности возвращается, когда коллекция пуста. В математике исключений не существует. - person samthebest; 25.11.2014
comment
@samthebest - я понимаю, о чем вы говорите, спасибо. Вот почему вам нужно работать с sumOption для полугрупп в Algebird ... - person Alex Dean; 26.11.2014
comment
@samthebest: Вы уверены в коммутативности? noreferrer "noreferrer" github.com/apache/spark/blob/ говорит, что для некоммутативных функций результат может отличаться от результата свертывания, примененного к нераспределенной коллекции. - person Make42; 17.05.2016
comment
Чтобы быть честным, контракт fold на RDD в точности совпадает с контрактом Scala ParSeq.fold. Хотя ParSeqs обычно (всегда?) Следят за порядком, это на самом деле не является частью контракта. Можно легко реализовать fold (или не так просто foldByKey), который гарантирует порядок слияния, и основная причина сделать это так, как это делается сейчас, - это производительность / гибкость планирования. - person zero323; 07.06.2016
comment
@ Make42 Я обновил свой ответ, действительно, я предположил, что порядок объединения был выполнен в порядке разделов (что было бы неплохо и логично реализовать его), но оказалось, что он основан на более первый пришел - первый обслужил базис и, следовательно, недетерминированный. - person samthebest; 07.06.2016
comment
@samthebest: Итак, в Spark все методы комбайнера (reduce, reduceByKey, fold, foldByKey, aggregate, aggregateByKey, combineByKey) должны передавать функции, которые в конце концов являются ассоциативными и коммутативными, верно? - person Make42; 08.06.2016
comment
@ Make42 Правильно, можно написать своего собственного reallyFold сутенера, например: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f), для этого f не понадобится. - person samthebest; 08.06.2016
comment
@samthebest: Не уверен, что это работает ... Сбор также получает разделы в порядке очереди. Как узнать, что они находятся в правильном порядке для второго сгиба? Я попробовал это на реальном примере и вернул свои слова в правильном порядке как в вашем коде, так и в исходной свертке Spark. Я подозреваю, что внутри свертки разделов в Spark не потребуется коммутативность, но проблема заключается в объединении разделов. Ваше решение не улучшит этого, поскольку сбор будет иметь ту же проблему. Что вы думаете? - person Make42; 09.06.2016
comment
@ Make42 Я прочитал код для сбора, он сохраняет порядок разделов. Пройдите через него, и вы увидите этот (index, res) => results(index) = res как обработчик результата, поэтому обработчик результата использует индекс раздела для помещения результата в Array - person samthebest; 09.06.2016
comment
@samthebest, поэтому на данный момент у меня есть вопрос, что concat строки некоммутативен, когда я запускаю этот код несколько раз List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par.reduce(_+_), я думаю, он должен давать мне случайный результат, но я получаю каждый раз один и тот же результат. - person altayseyhan; 16.02.2017
comment
@Cloudtech @samthebest Я предполагаю, что в этом случае (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _) каждый раз дает разные результаты, связанные с неассоциативной операцией деления, я имею в виду не связанной с некоммутативностью [docs.scala-lang.org/overviews/parallel-collections/, пожалуйста, посмотрите на строку, начинающуюся с Примечание. Часто считается что.. - person altayseyhan; 16.02.2017
comment
Мне действительно неприятно нарушать идеальный подсчет голосов в 256 голосов, но это отличный ответ !! - person jrista; 21.07.2020

Если я не ошибаюсь, даже если Spark API этого не требует, fold также требует, чтобы f был коммутативным. Потому что порядок, в котором будут агрегированы разделы, не гарантирован. Например, в следующем коде сортируется только первая распечатка:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

Распечатка:

АБВГДЕЖЗИЙКЛМНОПРСТУФХЦЧШЩЫЭЮЯ

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

person Mishael Rosenthal    schedule 16.03.2015
comment
Поразмыслив, мы считаем, что вы правы. Порядок комбинирования - первым пришел - первым подал. Если вы запустите sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _) с 2+ ядрами несколько раз, я думаю, вы увидите, что он производит случайный (по разделам) порядок. Я соответствующим образом обновил свой ответ. - person samthebest; 08.06.2016

fold в Apache Spark - это не то же самое, что fold в нераспространяемых коллекциях. Фактически для получения детерминированных результатов требуется коммутативная функция:

Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций на функциональных языках, таких как Scala. Эту операцию сворачивания можно применять к разделам по отдельности, а затем сворачивать эти результаты в окончательный результат, а не применять сворачивание к каждому элементу последовательно в некотором определенном порядке. Для некоммутативных функций результат может отличаться от результата свертки, примененной к нераспределенной коллекции.

Этот был показан Мишель Розенталь и предложено Make42 в его комментарий.

Было предложено, что наблюдаемое поведение относится к HashPartitioner, хотя на самом деле parallelize не перемешивается и не использует HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

Разъяснил:

Структура fold для RDD

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

то же самое как структура reduce для СДР:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

где runJob выполняется без учета порядка разбиения и требует коммутативной функции.

foldPartition и reducePartition эквивалентны с точки зрения порядка обработки и эффективно (путем наследования и делегирования) реализованы _ 14_ и _ 15_ на _ 16_.

Вывод: fold в RDD не может зависеть от порядка фрагментов и требует коммутативности и ассоциативности.

person Community    schedule 05.06.2016
comment
Я должен признать, что этимология сбивает с толку, а в литературе по программированию отсутствуют формальные определения. Я думаю, можно с уверенностью сказать, что fold на RDDs действительно действительно то же самое, что и reduce, но при этом не учитываются основные математические различия (я обновил свой ответ, чтобы он был еще более ясным). Хотя я не согласен с тем, что нам действительно нужна коммутативность, при условии, что каждый уверен, что бы ни делал его разделитель, он сохраняет порядок. - person samthebest; 06.06.2016
comment
Неопределенный порядок складывания не связан с разбиением. Это прямое следствие реализации runJob. - person ; 06.06.2016
comment
А! Извините, я не смог понять, в чем заключалась ваша точка зрения, но, прочитав код runJob, я вижу, что действительно он выполняет объединение в соответствии с тем, когда задача завершена, а НЕ по порядку разделов. Именно эта ключевая деталь заставляет все встать на свои места. Я отредактировал свой ответ еще раз и тем самым исправил указанную вами ошибку. Пожалуйста, не могли бы вы удалить свою награду, раз уж мы пришли к соглашению? - person samthebest; 07.06.2016
comment
Редактировать или удалять не могу - такой возможности нет. Я могу наградить, но я думаю, что вы получаете немало очков за одно только внимание, я ошибаюсь? Если вы подтвердите, что хотите, чтобы я наградил вас, я сделаю это в течение следующих 24 часов. Спасибо за исправления и извините за метод, но похоже, что вы игнорируете все предупреждения, это большое дело, и ответ цитировался повсюду. - person ; 07.06.2016
comment
Как насчет того, чтобы вручить награду @Mishael Rosenthal, поскольку он первым четко выразил озабоченность. Меня не интересуют пункты, мне просто нравится использовать SO для SEO и организации. - person samthebest; 08.06.2016

Еще одно отличие Scalding - это использование в Hadoop объединителей.

Представьте, что ваша операция является коммутативным моноидом, с reduce она также будет применяться на стороне карты вместо перетасовки / сортировки всех данных в редукторы. С foldLeft дело обстоит иначе.

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

Всегда полезно определять свои операции как моноид в Scalding.

person morazow    schedule 07.08.2014