Разлика между намаляване и foldLeft/fold във функционалното програмиране (по-специално Scala и Scala API)?

Защо Scala и рамки като Spark и Scalding имат както reduce, така и foldLeft? Тогава каква е разликата между reduce и fold?


person samthebest    schedule 06.08.2014    source източник
comment
Вижте също: stackoverflow.com/questions/16111440/scala -fold-vs-foldleft/   -  person axel22    schedule 06.08.2014
comment
Също така преразгледана разликата между сгъване и намаляване   -  person Alper t. Turker    schedule 11.05.2018


Отговори (4)


намаляване срещу foldLeft

Голяма голяма разлика, която не се споменава в нито един друг отговор на stackoverflow, свързан ясно с тази тема, е, че на reduce трябва да се даде комутативен моноид, т.е. операция, която е едновременно комутативна и асоциативна. Това означава, че операцията може да бъде паралелизирана.

Това разграничение е много важно за големи данни / MPP / разпределени изчисления и цялата причина, поради която reduce дори съществува. Колекцията може да бъде нарязана и reduce може да работи с всяка част, след което reduce може да работи с резултатите от всяка част - всъщност нивото на разделяне не трябва да спира едно ниво навътре. Можем също да накълцаме всяко парче. Ето защо сумирането на цели числа в списък е O(log N), ако са дадени безкраен брой процесори.

Ако просто погледнете подписите, няма причина reduce да съществува, защото можете да постигнете всичко, което можете с reduce с foldLeft. Функционалността на foldLeft е по-голяма от функционалността на reduce.

Но не можете да паралелизирате foldLeft, така че времето му за изпълнение винаги е O(N) (дори ако захранвате комутативен моноид). Това е така, защото се приема, че операцията не е комутативен моноид и така натрупаната стойност ще бъде изчислена чрез поредица от последователни агрегации.

foldLeft не предполага комутативност, нито асоциативност. Това е асоциативността, която дава възможност за нарязване на колекцията, и това е комутативността, която прави натрупването лесно, тъй като редът не е важен (така че няма значение в кой ред да се агрегират всеки от резултатите от всяко от парчетата). Строго погледнато комутативността не е необходима за паралелизиране, например алгоритми за разпределено сортиране, тя просто прави логиката по-лесна, защото не е необходимо да подреждате вашите парчета.

Ако погледнете документацията на Spark за reduce, тя изрично казва "... комутативен и асоциативен двоичен оператор"

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

Ето доказателство, че reduce НЕ е само частен случай на foldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

намаляване срещу сгъване

Сега това е мястото, където става малко по-близо до FP / математическите корени и е малко по-трудно за обяснение. Reduce се дефинира официално като част от парадигмата MapReduce, която се занимава с колекции без подреждане (мултисетове), Fold е формално дефиниран от гледна точка на рекурсия (вижте катаморфизъм) и по този начин приема структура/последователност на колекциите.

Няма fold метод в Scalding, тъй като при (строгия) програмен модел Map Reduce не можем да дефинираме fold, тъй като парчетата нямат подреждане и fold изисква само асоциативност, не комутативност.

Казано просто, reduce работи без ред на натрупване, fold изисква ред на натрупване и именно този ред на натрупване изисква нулева стойност, НЕ съществуването на нулева стойност, която ги отличава. Строго погледнато reduce трябва да работи върху празна колекция, защото нейната нулева стойност може да се изведе чрез вземане на произволна стойност x и след това решаване на x op y = x, но това не работи с некомутативна операция, тъй като може съществуват лява и дясна нулева стойност, които са различни (т.е. x op y != y op x). Разбира се, Scala не си прави труда да определи каква е тази нулева стойност, тъй като това би изисквало извършване на някои математически изчисления (които вероятно са неизчислими), така че просто хвърля изключение.

Изглежда (както често се случва в етимологията), че това първоначално математическо значение е изгубено, тъй като единствената очевидна разлика в програмирането е подписът. Резултатът е, че reduce се е превърнало в синоним на fold, вместо да запази първоначалното си значение от MapReduce. Сега тези термини често се използват взаимозаменяемо и се държат по същия начин в повечето реализации (игнорирайки празни колекции). Странността се изостря от особености, като в Spark, които сега ще разгледаме.

Така че Spark има fold, но редът, в който подрезултатите (по един за всеки дял) се комбинират (по време на писане) е същият ред, в който се изпълняват задачите - и следователно не- детерминистичен. Благодаря на @CafeFeed, че посочи, че fold използва runJob, което след прочитане на кода разбрах, че не е детерминирано. Допълнително объркване се създава от това, че Spark има treeReduce, но не и treeFold.

Заключение

Има разлика между reduce и fold дори когато се прилага към непразни последователности. Първото е дефинирано като част от парадигмата за програмиране MapReduce върху колекции с произволен ред (http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf) и трябва да приемем, че операторите са комутативни в допълнение към това, че са асоциативни, за да дават детерминистични резултати. Последният се дефинира от гледна точка на катоморфизми и изисква колекциите да имат понятие за последователност (или да се дефинират рекурсивно, като свързани списъци), следователно не изискват комутативни оператори.

На практика поради нематематичния характер на програмирането, reduce и fold са склонни да се държат по един и същи начин, или правилно (като в Scala), или неправилно (като в Spark).

Допълнително: Моето мнение за API на Spark

Моето мнение е, че объркването ще бъде избегнато, ако използването на термина fold бъде изцяло премахнато в Spark. Поне Spark има бележка в документацията си:

Това се държи малко по-различно от операциите за сгъване, реализирани за неразпределени колекции във функционални езици като Scala.

person samthebest    schedule 06.08.2014
comment
Ето защо foldLeft съдържа Left в името си и защо има и метод, наречен fold. - person kiritsuku; 06.08.2014
comment
@sschaef вярно, но fold не съществува в, да речем Scalding, защото за разлика от reduce не изисква комутативност. Актуализирах отговора си, за да обясня това. Основно това, което се опитвам да кажа е, че разликата между fold* и reduce* е много свързана с корените на FP в теорията на категориите. - person samthebest; 06.08.2014
comment
Имате ли пример за различен (т.е. x op y != y op x), като използвате намаляване и сгъване, за да видите разликата. Не мисля, че е правилно твърдението за намаляване. Например, използвам некомутативна операция деление, за да направя редуцирането. (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).reduce(_ / _) За израза (т.е. x op y != y op x). трябва да ми даде произволен резултат, но ми дава един и същ резултат всеки път, така че мисля, че намаляването все още запазва реда на натрупване - person Xiaohe Dong; 07.08.2014
comment
@Cloudtech Това е съвпадение на внедряването му с една нишка, а не в рамките на неговата спецификация. На моята 4-ядрена машина, ако се опитам да добавя .par, така че (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _) получавам различни резултати всеки път. - person samthebest; 07.08.2014
comment
Страхотен отговор. Мисля, че на reduce трябва да се даде комутативен моноид, т.е. операция, която е едновременно комутативна и асоциативна, трябва да гласи: на reduce трябва да се даде комутативна полугрупа, т.е. структура от данни, която поддържа комбинираща операция, която е едновременно комутативна и асоциативна. Не вярвам, че намаляването се нуждае от елемент на идентичност. - person Alex Dean; 24.11.2014
comment
@AlexDean в контекста на компютърните науки, не, всъщност не се нуждае от идентичност, тъй като празните колекции са склонни просто да хвърлят изключения. Но е математически по-елегантно (и би било по-елегантно, ако колекциите правят това), ако елементът за идентичност се връща, когато колекцията е празна. В математиката хвърляне на изключение не съществува. - person samthebest; 25.11.2014
comment
@samthebest - Разбирам какво казваш, благодаря. Ето защо трябва да работите със sumOption за полугрупи в Algebird... - person Alex Dean; 26.11.2014
comment
@samthebest: Сигурен ли си за комутативността? github.com/apache/spark/blob/ казва За функции, които не са комутативни, резултатът може да се различава от този на сгъване, приложено към неразпределена колекция. - person Make42; 17.05.2016
comment
Ако трябва да сме честни, договорът на fold за RDD е точно същият като договора на Scala ParSeq.fold. Въпреки че ParSeqs обикновено (винаги?) спазва поръчката, това всъщност не е част от договора. Човек може лесно да внедри fold (или не толкова лесно foldByKey), който гарантира ред на сливане и основната причина да го направите, както се прави сега, е производителността / гъвкавостта на планирането. - person zero323; 07.06.2016
comment
@Make42 Актуализирах отговора си, наистина предположих, че редът на комбиниране е направен по реда на дяловете (което би било хубаво и логичният начин да се приложи), но се оказва, че се основава на по-скоро първи дошъл първи обслужен и следователно недетерминиран. - person samthebest; 07.06.2016
comment
@samthebest: И така, в Spark, всички методи на комбиниране (reduce, reduceByKey, fold, foldByKey, aggregate, aggregateByKey, combineByKey) трябва да получат функции, преминали, които са и двете, асоциативни и комутативни в крайна сметка, нали? - person Make42; 08.06.2016
comment
@Make42 Това е правилно, все пак човек може да напише свой собствен reallyFold pimp като: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f), това няма да изисква f за пътуване до работа. - person samthebest; 08.06.2016
comment
@samthebest: Не съм сигурен, че това работи... The collect също получава дяловете първи дошъл, първи обслужен. Откъде знаеш, че са в правилния ред за второто сгъване? Опитах това с реален пример и върнах думите си в правилния ред както с вашия код, така и с оригиналния фолд на Spark. Подозирам, че в рамките на сгъване на дял в Spark няма да се нуждае от комутативност, но проблемът е поставянето на дяловете заедно. Вашето решение не подобрява това, тъй като събирането ще има същия проблем. Какво мислиш? - person Make42; 09.06.2016
comment
@Make42 Прочетох кода за collect, той запазва реда на дяловете. Преминете през него и ще видите това (index, res) => results(index) = res като манипулатор на резултата - така че манипулаторът на резултата използва индекса на дяла, за да постави резултата в Array - person samthebest; 09.06.2016
comment
@samthebest, така че в този момент имам въпрос afaik string concat е некомутативен, когато стартирам този код няколко пъти List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par.reduce(_+_) Предполагам, че трябва да ми даде произволен резултат, но получавам един и същ резултат всеки път. - person altayseyhan; 16.02.2017
comment
@Cloudtech @samthebest Предполагам, че в този случай (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _) произвежда различни резултати във всеки момент, свързани с това, че операцията за разделяне не е асоциативна, имам предвид, че не е свързана с това, че е некомутативна [docs.scala-lang.org/overviews/parallel-collections/ моля, погледнете реда, започващ със Забележка: Често се смята, че че.. - person altayseyhan; 16.02.2017
comment
Наистина мразя да нарушавам перфектния брой от 256 гласа, но това е отличен отговор!! - person jrista; 21.07.2020

Ако не греша, въпреки че API на Spark не го изисква, fold също изисква f да бъде комутативен. Тъй като редът, в който дяловете ще бъдат агрегирани, не е гарантиран. Например в следния код се сортира само първата разпечатка:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

Принтирам:

abcdefghijklmnopqrstuvwxyz

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

person Mishael Rosenthal    schedule 16.03.2015
comment
След малко напред-назад вярваме, че сте прав. Редът на комбиниране е първи дошъл първи сервиран. Ако стартирате sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _) с 2+ ядра няколко пъти, мисля, че ще видите, че произвежда произволен (по отношение на дяловете) ред. Актуализирах отговора си съответно. - person samthebest; 08.06.2016

fold в Apache Spark не е същото като fold в неразпространени колекции. Всъщност изисква комутативна функция за получаване на детерминистични резултати:

Това се държи малко по-различно от операциите за сгъване, реализирани за неразпределени колекции във функционални езици като Scala. Тази операция на сгъване може да се приложи към дялове поотделно и след това да се сгънат тези резултати в крайния резултат, вместо да се приложи сгъването към всеки елемент последователно в някакъв определен ред. За функции, които не са комутативни, резултатът може да се различава от този на сгъване, приложено към неразпределена колекция.

Това е показано от Mishael Rosenthal и предложено от Make42 в неговия коментар.

Предполага се, че наблюдаваното поведение е свързано с HashPartitioner, когато всъщност parallelize не разбърква и не използва HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

Обяснено:

Структура на fold за RDD

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

е същият като структура на reduce за RDD:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

където runJob се изпълнява без зачитане на реда на разделяне и води до нужда от комутативна функция.

foldPartition и reducePartition са еквивалентни по отношение на реда на обработка и ефективно (чрез наследяване и делегиране) се изпълняват от reduceLeft и foldLeft на TraversableOnce.

Заключение: fold на RDD не може да зависи от реда на блоковете и се нуждае от комутативност и асоциативност.

person Community    schedule 05.06.2016
comment
Трябва да призная, че етимологията е объркваща и в литературата по програмиране липсват официални дефиниции. Мисля, че е безопасно да се каже, че fold на RDDs наистина е точно същото като reduce, но това не зачита основните математически разлики (актуализирах отговора си, за да бъда още по-ясен). Въпреки че не съм съгласен, че наистина се нуждаем от комутативност, при условие че човек е уверен, че каквото и да прави техният partioner, то запазва реда. - person samthebest; 06.06.2016
comment
Недефинираният ред на сгъване не е свързан с разделянето. Това е пряко следствие от изпълнението на runJob. - person ; 06.06.2016
comment
АХ! Съжалявам, не можах да разбера каква е вашата гледна точка, но след като прочетох кода runJob, виждам, че той наистина извършва комбинирането според това кога задачата е завършена, НЕ според реда на дяловете. Именно този ключов детайл кара всичко да си дойде на мястото. Редактирах отговора си отново и по този начин коригирах грешката, която посочихте. Моля, можете ли да премахнете наградата си, тъй като вече сме съгласни? - person samthebest; 07.06.2016
comment
Не мога да редактирам или премахвам - няма такава опция. Мога да присъдя, но мисля, че получавате доста точки само от внимание, греша ли? Ако потвърдите, че искате да наградя, ще го направя в следващите 24 часа. Благодаря за корекциите и съжалявам за метода, но изглежда, че пренебрегвате всички предупреждения, това е голямо нещо и отговорът е цитиран навсякъде. - person ; 07.06.2016
comment
Какво ще кажете да го присъдите на @Mishael Rosenthal, тъй като той беше първият, който ясно изрази загрижеността. Не се интересувам от точките, просто обичам да използвам SO за SEO и организация. - person samthebest; 08.06.2016

Друга разлика за Scalding е използването на комбинатори в Hadoop.

Представете си, че вашата операция е комутативен моноид, с reduce тя ще бъде приложена и от страната на картата, вместо да разбърква/сортира всички данни към редуктори. С foldLeft това не е така.

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

Винаги е добра практика да дефинирате вашите операции като моноид в Scalding.

person morazow    schedule 07.08.2014