Объяснение кратного метода искрового РДД

Я запускаю Spark-1.4.0, предварительно созданный для Hadoop-2.4 (в локальном режиме), чтобы вычислить сумму квадратов DoubleRDD. Мой код Scala выглядит так

sc.parallelize(Array(2., 3.)).fold(0.0)((p, v) => p+v*v)

И это дало удивительный результат 97.0.

Это довольно нелогично по сравнению с версией fold для Scala.

Array(2., 3.).fold(0.0)((p, v) => p+v*v)

что дает ожидаемый ответ 13.0.

Кажется вполне вероятным, что я допустил несколько коварных ошибок в коде из-за непонимания. Я читал о том, что функция, используемая в RDD.fold(), должна быть коммуникативной, иначе результат может зависеть от разделов и т. Д. Например, если я изменю количество разделов на 1,

sc.parallelize(Array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)

код даст мне 169.0 на моей машине!

Может кто-нибудь объяснить, что именно здесь происходит?


person dolaameng    schedule 17.07.2015    source источник


Ответы (1)


На самом деле это довольно хорошо объясняется в официальная документация:

Агрегируйте элементы каждого раздела, а затем результаты для всех разделов, используя заданную ассоциативную и коммутативную функцию и нейтральное «нулевое значение». Функция op(t1, t2) может изменять t1 и возвращать его в качестве результата, чтобы избежать выделения объекта; однако он не должен изменять t2.

Это ведет себя несколько иначе, чем операции свертывания, реализованные для нераспределенных коллекций в функциональных языках, таких как Scala. Эта операция сворачивания может применяться к разбиениям по отдельности, а затем складывать эти результаты в окончательный результат, а не применять свертку к каждому элементу последовательно в некотором определенном порядке. Для некоммутативных функций результат может отличаться от результата свертки, примененной к нераспределенной коллекции.

Чтобы проиллюстрировать, что происходит, давайте попробуем смоделировать происходящее шаг за шагом:

val rdd = sc.parallelize(Array(2., 3.))

val byPartition = rdd.mapPartitions(
    iter => Array(iter.fold(0.0)((p, v) => (p +  v * v))).toIterator).collect()

Это дает нам что-то похожее на это Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0) и

byPartition.reduce((p, v) => (p + v * v))

возвращает 97

Важно отметить, что результаты могут отличаться от запуска к запуску в зависимости от порядка объединения разделов.

person zero323    schedule 17.07.2015
comment
Спасибо! Ваше объяснение подтвердило мое подозрение - (2*2) * (2*2) + (3*3) * (3*3) = 97. А во втором случае это как (2*2 + 3*3) * (2*2 + 3*3) = 169. Я предполагаю, что именно поэтому foldLeft и foldRight не имеют соответствия в spark - порядок просто не имеет смысла в распределенном фолде. - person dolaameng; 17.07.2015
comment
На самом деле в следующем случае Array(2., 3.) вы можете получить даже 25 или 85, отсюда и мой последний комментарий. Вообще говоря, если функция не является коммутативной и ассоциативной, могут происходить странные вещи. Правильный способ обработки вещей здесь - это отобразить, а затем свернуть: rdd.map(x => x * x).fold(0.0)(_ + _). - person zero323; 17.07.2015
comment
Что интересно, PySpark, кажется, обрабатывает вещи по-другому и возвращает 13 для (2., 3.) ввода O_o с тем же количеством разделов. - person zero323; 17.07.2015
comment
Я думаю, что у pyspark тоже есть такое же ограничение, см. следующие результаты: sc.parallelize([2., 3.]).fold(0.0, lambda p,v : p+v*v) генерирует 49 sc.parallelize([2., 3.], 100).fold(0.0, lambda p,v : p+v*v) генерирует 13 - person dolaameng; 17.07.2015
comment
Да, это имеет. Он просто применяет функцию в op(obj, acc) вместо op(acc, obj), поэтому, если в разделе есть только один элемент, он не возводится в квадрат на этапе mapPartitions :) - person zero323; 17.07.2015