Защо Scala и рамки като Spark и Scalding имат както reduce
, така и foldLeft
? Тогава каква е разликата между reduce
и fold
?
Разлика между намаляване и foldLeft/fold във функционалното програмиране (по-специално Scala и Scala API)?
Отговори (4)
намаляване срещу foldLeft
Голяма голяма разлика, която не се споменава в нито един друг отговор на stackoverflow, свързан ясно с тази тема, е, че на reduce
трябва да се даде комутативен моноид, т.е. операция, която е едновременно комутативна и асоциативна. Това означава, че операцията може да бъде паралелизирана.
Това разграничение е много важно за големи данни / MPP / разпределени изчисления и цялата причина, поради която reduce
дори съществува. Колекцията може да бъде нарязана и reduce
може да работи с всяка част, след което reduce
може да работи с резултатите от всяка част - всъщност нивото на разделяне не трябва да спира едно ниво навътре. Можем също да накълцаме всяко парче. Ето защо сумирането на цели числа в списък е O(log N), ако са дадени безкраен брой процесори.
Ако просто погледнете подписите, няма причина reduce
да съществува, защото можете да постигнете всичко, което можете с reduce
с foldLeft
. Функционалността на foldLeft
е по-голяма от функционалността на reduce
.
Но не можете да паралелизирате foldLeft
, така че времето му за изпълнение винаги е O(N) (дори ако захранвате комутативен моноид). Това е така, защото се приема, че операцията не е комутативен моноид и така натрупаната стойност ще бъде изчислена чрез поредица от последователни агрегации.
foldLeft
не предполага комутативност, нито асоциативност. Това е асоциативността, която дава възможност за нарязване на колекцията, и това е комутативността, която прави натрупването лесно, тъй като редът не е важен (така че няма значение в кой ред да се агрегират всеки от резултатите от всяко от парчетата). Строго погледнато комутативността не е необходима за паралелизиране, например алгоритми за разпределено сортиране, тя просто прави логиката по-лесна, защото не е необходимо да подреждате вашите парчета.
Ако погледнете документацията на Spark за reduce
, тя изрично казва "... комутативен и асоциативен двоичен оператор"
http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
Ето доказателство, че reduce
НЕ е само частен случай на foldLeft
scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par
scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds
scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds
намаляване срещу сгъване
Сега това е мястото, където става малко по-близо до FP / математическите корени и е малко по-трудно за обяснение. Reduce се дефинира официално като част от парадигмата MapReduce, която се занимава с колекции без подреждане (мултисетове), Fold е формално дефиниран от гледна точка на рекурсия (вижте катаморфизъм) и по този начин приема структура/последователност на колекциите.
Няма fold
метод в Scalding, тъй като при (строгия) програмен модел Map Reduce не можем да дефинираме fold
, тъй като парчетата нямат подреждане и fold
изисква само асоциативност, не комутативност.
Казано просто, reduce
работи без ред на натрупване, fold
изисква ред на натрупване и именно този ред на натрупване изисква нулева стойност, НЕ съществуването на нулева стойност, която ги отличава. Строго погледнато reduce
трябва да работи върху празна колекция, защото нейната нулева стойност може да се изведе чрез вземане на произволна стойност x
и след това решаване на x op y = x
, но това не работи с некомутативна операция, тъй като може съществуват лява и дясна нулева стойност, които са различни (т.е. x op y != y op x
). Разбира се, Scala не си прави труда да определи каква е тази нулева стойност, тъй като това би изисквало извършване на някои математически изчисления (които вероятно са неизчислими), така че просто хвърля изключение.
Изглежда (както често се случва в етимологията), че това първоначално математическо значение е изгубено, тъй като единствената очевидна разлика в програмирането е подписът. Резултатът е, че reduce
се е превърнало в синоним на fold
, вместо да запази първоначалното си значение от MapReduce. Сега тези термини често се използват взаимозаменяемо и се държат по същия начин в повечето реализации (игнорирайки празни колекции). Странността се изостря от особености, като в Spark, които сега ще разгледаме.
Така че Spark има fold
, но редът, в който подрезултатите (по един за всеки дял) се комбинират (по време на писане) е същият ред, в който се изпълняват задачите - и следователно не- детерминистичен. Благодаря на @CafeFeed, че посочи, че fold
използва runJob
, което след прочитане на кода разбрах, че не е детерминирано. Допълнително объркване се създава от това, че Spark има treeReduce
, но не и treeFold
.
Заключение
Има разлика между reduce
и fold
дори когато се прилага към непразни последователности. Първото е дефинирано като част от парадигмата за програмиране MapReduce върху колекции с произволен ред (http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf) и трябва да приемем, че операторите са комутативни в допълнение към това, че са асоциативни, за да дават детерминистични резултати. Последният се дефинира от гледна точка на катоморфизми и изисква колекциите да имат понятие за последователност (или да се дефинират рекурсивно, като свързани списъци), следователно не изискват комутативни оператори.
На практика поради нематематичния характер на програмирането, reduce
и fold
са склонни да се държат по един и същи начин, или правилно (като в Scala), или неправилно (като в Spark).
Допълнително: Моето мнение за API на Spark
Моето мнение е, че объркването ще бъде избегнато, ако използването на термина fold
бъде изцяло премахнато в Spark. Поне Spark има бележка в документацията си:
Това се държи малко по-различно от операциите за сгъване, реализирани за неразпределени колекции във функционални езици като Scala.
foldLeft
съдържа Left
в името си и защо има и метод, наречен fold
.
- person kiritsuku; 06.08.2014
fold
не съществува в, да речем Scalding, защото за разлика от reduce
не изисква комутативност. Актуализирах отговора си, за да обясня това. Основно това, което се опитвам да кажа е, че разликата между fold*
и reduce*
е много свързана с корените на FP в теорията на категориите.
- person samthebest; 06.08.2014
.par
, така че (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
получавам различни резултати всеки път.
- person samthebest; 07.08.2014
fold
за RDD е точно същият като договора на Scala ParSeq.fold
. Въпреки че ParSeqs
обикновено (винаги?) спазва поръчката, това всъщност не е част от договора. Човек може лесно да внедри fold
(или не толкова лесно foldByKey
), който гарантира ред на сливане и основната причина да го направите, както се прави сега, е производителността / гъвкавостта на планирането.
- person zero323; 07.06.2016
reduce
, reduceByKey
, fold
, foldByKey
, aggregate
, aggregateByKey
, combineByKey
) трябва да получат функции, преминали, които са и двете, асоциативни и комутативни в крайна сметка, нали?
- person Make42; 08.06.2016
reallyFold
pimp като: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)
, това няма да изисква f за пътуване до работа.
- person samthebest; 08.06.2016
(index, res) => results(index) = res
като манипулатор на резултата - така че манипулаторът на резултата използва индекса на дяла, за да постави резултата в Array
- person samthebest; 09.06.2016
List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par.reduce(_+_)
Предполагам, че трябва да ми даде произволен резултат, но получавам един и същ резултат всеки път.
- person altayseyhan; 16.02.2017
(List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
произвежда различни резултати във всеки момент, свързани с това, че операцията за разделяне не е асоциативна, имам предвид, че не е свързана с това, че е некомутативна [docs.scala-lang.org/overviews/parallel-collections/ моля, погледнете реда, започващ със Забележка: Често се смята, че че..
- person altayseyhan; 16.02.2017
Ако не греша, въпреки че API на Spark не го изисква, fold също изисква f да бъде комутативен. Тъй като редът, в който дяловете ще бъдат агрегирани, не е гарантиран. Например в следния код се сортира само първата разпечатка:
import org.apache.spark.{SparkConf, SparkContext}
object FoldExample extends App{
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Simple Application")
implicit val sc = new SparkContext(conf)
val range = ('a' to 'z').map(_.toString)
val rdd = sc.parallelize(range)
println(range.reduce(_ + _))
println(rdd.reduce(_ + _))
println(rdd.fold("")(_ + _))
}
Принтирам:
abcdefghijklmnopqrstuvwxyz
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)
с 2+ ядра няколко пъти, мисля, че ще видите, че произвежда произволен (по отношение на дяловете) ред. Актуализирах отговора си съответно.
- person samthebest; 08.06.2016
fold
в Apache Spark не е същото като fold
в неразпространени колекции. Всъщност изисква комутативна функция за получаване на детерминистични резултати:
Това се държи малко по-различно от операциите за сгъване, реализирани за неразпределени колекции във функционални езици като Scala. Тази операция на сгъване може да се приложи към дялове поотделно и след това да се сгънат тези резултати в крайния резултат, вместо да се приложи сгъването към всеки елемент последователно в някакъв определен ред. За функции, които не са комутативни, резултатът може да се различава от този на сгъване, приложено към неразпределена колекция.
Това е показано от Mishael Rosenthal и предложено от Make42 в неговия коментар.
Предполага се, че наблюдаваното поведение е свързано с HashPartitioner
, когато всъщност parallelize
не разбърква и не използва HashPartitioner
.
import org.apache.spark.sql.SparkSession
/* Note: standalone (non-local) mode */
val master = "spark://...:7077"
val spark = SparkSession.builder.master(master).getOrCreate()
/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })
/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
Обяснено:
Структура на fold
за RDD
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
var jobResult: T
val cleanOp: (T, T) => T
val foldPartition = Iterator[T] => T
val mergeResult: (Int, T) => Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
е същият като структура на reduce
за RDD:
def reduce(f: (T, T) => T): T = withScope {
val cleanF: (T, T) => T
val reducePartition: Iterator[T] => Option[T]
var jobResult: Option[T]
val mergeResult = (Int, Option[T]) => Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
където runJob
се изпълнява без зачитане на реда на разделяне и води до нужда от комутативна функция.
foldPartition
и reducePartition
са еквивалентни по отношение на реда на обработка и ефективно (чрез наследяване и делегиране) се изпълняват от reduceLeft
и foldLeft
на TraversableOnce
.
Заключение: fold
на RDD не може да зависи от реда на блоковете и се нуждае от комутативност и асоциативност.
fold
на RDD
s наистина е точно същото като reduce
, но това не зачита основните математически разлики (актуализирах отговора си, за да бъда още по-ясен). Въпреки че не съм съгласен, че наистина се нуждаем от комутативност, при условие че човек е уверен, че каквото и да прави техният partioner, то запазва реда.
- person samthebest; 06.06.2016
runJob
, виждам, че той наистина извършва комбинирането според това кога задачата е завършена, НЕ според реда на дяловете. Именно този ключов детайл кара всичко да си дойде на мястото. Редактирах отговора си отново и по този начин коригирах грешката, която посочихте. Моля, можете ли да премахнете наградата си, тъй като вече сме съгласни?
- person samthebest; 07.06.2016
Друга разлика за Scalding е използването на комбинатори в Hadoop.
Представете си, че вашата операция е комутативен моноид, с reduce тя ще бъде приложена и от страната на картата, вместо да разбърква/сортира всички данни към редуктори. С foldLeft това не е така.
pipe.groupBy('product) {
_.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
// reduce is .mapReduceMap in disguise
}
pipe.groupBy('product) {
_.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}
Винаги е добра практика да дефинирате вашите операции като моноид в Scalding.