Карта Spark RDD в режиме пряжи не разрешает доступ к переменным?

У меня есть новая установка spark 1.2.1 через кластер mapr, и во время тестирования я обнаружил, что он хорошо работает в локальном режиме, но в режимах пряжи он, похоже, не может получить доступ к переменным, даже если они транслируются. Чтобы быть точным, следующий тестовый код

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object JustSpark extends App {
val conf = new org.apache.spark.SparkConf().setAppName("SimpleApplication")
val sc = new SparkContext(conf)
val a = List(1,3,4,5,6)
val b = List("a","b","c")
val bBC= sc.broadcast(b)
val data = sc.parallelize(a)
val transform = data map ( t => { "hi" })
transform.take(3) foreach (println _)
val transformx2 = data map ( t => { bBC.value.size })
transformx2.take(3) foreach (println _)
//val transform2 = data map ( t => { b.size })
//transform2.take(3) foreach (println _)
}

работает в локальном режиме, но не работает в пряже. Точнее, оба метода, transform2 и transformx2, не работают, и все они работают, если --master local[8].

Я компилирую его с помощью sbt и отправляю с помощью инструмента отправки.

/opt/mapr/spark/spark-1.2.1/bin/spark-submit --class JustSpark --master yarn target/scala-2.10/simulator_2.10-1.0.jar

Есть идеи, что происходит? В сообщении об ошибке просто утверждается, что есть исключение нулевого указателя java в том месте, где он должен обращаться к переменной. Есть ли другой способ передачи переменных внутри карт RDD?


person arivero    schedule 14.03.2015    source источник
comment
Определить неудачу? это важно, и вы не сказали, какая строка или какая ошибка.   -  person Sean Owen    schedule 15.03.2015


Ответы (3)



Я предполагаю, что виновником был

val transform2 = data map ( t => { b.size })

В частности, доступ к локальной переменной b . На самом деле вы можете увидеть в своих файлах журнала java.io.NotSerializableException .

Что должно произойти: Spark попытается сериализовать любой объект, на который есть ссылка. В данном случае это означает весь класс JustSpark, поскольку имеется ссылка на один из его членов.

Почему это не удалось? Ваш класс не поддерживает сериализацию. Поэтому Spark не может отправить его по сети. В частности, у вас есть ссылка на SparkContext, которая не расширяет Serializable.

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

Итак, ваш первый код, который передает только значение переменной, является правильным.

person WestCoastProjects    schedule 14.03.2015
comment
b определенно сериализуем здесь как список строк. Это не ошибка - я думаю. Он говорит, что это NPE где-то. - person Sean Owen; 15.03.2015
comment
Оки Доки. Я ни разу не пользовался приложением, поэтому не знал его недостатков. - person WestCoastProjects; 18.03.2015

Это оригинальный пример вещания из источников spark, измененный для использования списков вместо массивов:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object MultiBroadcastTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Multi-Broadcast Test")
val sc = new SparkContext(sparkConf)
val slices = if (args.length > 0) args(0).toInt else 2
val num = if (args.length > 1) args(1).toInt else 1000000
val arr1 = (1 to num).toList
val arr2 = (1 to num).toList
val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2)
val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
  (barr1.value.size, barr2.value.size)
}
observedSizes.collect().foreach(i => println(i))
sc.stop()
}}

Я скомпилировал его в своей среде, и он работает.

Так в чем же разница?

Проблемный пример использует extends App, в то время как исходный пример представляет собой простой синглтон.

Поэтому я понизил код до функции «doIt()».

object JustDoSpark extends App{
def doIt() {
...
}
doIt()

И угадайте, что. Это сработало.

Конечно, проблема действительно связана с сериализацией, но по-другому. Наличие кода в теле объекта, кажется, вызывает проблемы.

person arivero    schedule 14.03.2015
comment
На самом деле это связано с delayedInit из приложения, а не с самой сериализацией. - person Sean Owen; 15.03.2015
comment
И, согласно github.com/jongwook, даже определение можно затенить, просто extends App {{ ... }} будет работать. См. комментарии на странице github.com/apache/spark/pull/3497. - person arivero; 15.03.2015