Обяснение на метода на сгъване на искра RDD

Изпълнявам Spark-1.4.0, предварително изграден за Hadoop-2.4 (в локален режим), за да изчисля сумата от квадрати на DoubleRDD. Моят Scala код изглежда така

sc.parallelize(Array(2., 3.)).fold(0.0)((p, v) => p+v*v)

И даде изненадващ резултат 97.0.

Това е доста контраинтуитивно в сравнение с версията на Scala на fold

Array(2., 3.).fold(0.0)((p, v) => p+v*v)

което дава очаквания отговор 13.0.

Изглежда доста вероятно да съм направил някои трудни грешки в кода поради липса на разбиране. Прочетох за това как функцията, използвана в RDD.fold(), трябва да бъде комуникативна, в противен случай резултатът може да зависи от дялове и т.н. Например, ако променя броя на дяловете на 1,

sc.parallelize(Array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)

кодът ще ми даде 169.0 на моята машина!

Може ли някой да обясни какво точно се случва тук?


person dolaameng    schedule 17.07.2015    source източник


Отговори (1)


Е, всъщност е доста добре обяснено от официална документация:

Агрегирайте елементите на всеки дял и след това резултатите за всички дялове, като използвате дадена асоциативна и комутативна функция и неутрална „нулева стойност“. На функцията op(t1, t2) е позволено да модифицира t1 и да го върне като резултатна стойност, за да избегне разпределението на обекти; обаче не трябва да променя t2.

Това се държи малко по-различно от операциите за сгъване, реализирани за неразпределени колекции във функционални езици като Scala. Тази операция на сгъване може да се приложи към дялове поотделно и след това да се сгънат тези резултати в крайния резултат, вместо да се приложи сгъването към всеки елемент последователно в някакъв определен ред. За функции, които не са комутативни, резултатът може да се различава от този на сгъване, приложено към неразпределена колекция.

За да илюстрираме какво се случва, нека се опитаме да симулираме какво се случва стъпка по стъпка:

val rdd = sc.parallelize(Array(2., 3.))

val byPartition = rdd.mapPartitions(
    iter => Array(iter.fold(0.0)((p, v) => (p +  v * v))).toIterator).collect()

Това ни дава нещо подобно на това Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0) и

byPartition.reduce((p, v) => (p + v * v))

връща 97

Важно е да се отбележи, че резултатите могат да се различават от изпълнение до изпълнение в зависимост от реда, в който се комбинират дяловете.

person zero323    schedule 17.07.2015
comment
Благодаря! Вашето обяснение потвърди моето подозрение - (2*2) * (2*2) + (3*3) * (3*3) = 97. А във втория случай е като (2*2 + 3*3) * (2*2 + 3*3) = 169. Предполагам, че това е причината foldLeft и foldRight да нямат съответствие в spark - редът просто няма смисъл в distributed fold. - person dolaameng; 17.07.2015
comment
Всъщност в следния случай Array(2., 3.) можете дори да получите 25 или 85, оттук и последният ми коментар. Най-общо казано, ако функцията не е комутативна и асоциативна, могат да се случат странни неща. Правилният начин за справяне с нещата тук би бил картирането и след това фолдването: rdd.map(x => x * x).fold(0.0)(_ + _). - person zero323; 17.07.2015
comment
Какво е интересно PySpark изглежда се справя с нещата по различен начин и връща 13 за (2., 3.) вход O_o, със същия брой дялове. - person zero323; 17.07.2015
comment
Мисля, че pyspark също има същото ограничение, вижте следните резултати sc.parallelize([2., 3.]).fold(0.0, lambda p,v : p+v*v) генерира 49 sc.parallelize([2., 3.], 100).fold(0.0, lambda p,v : p+v*v) генерира 13 - person dolaameng; 17.07.2015
comment
Да то има. Той просто прилага функцията в op(obj, acc) вместо в op(acc, obj), така че ако има само един елемент в дяла, той не е на квадрат във фазата mapPartitions :) - person zero323; 17.07.2015