У меня есть фрагмент кода Spark, который работал на Spark 1.3, но не работает, когда я перемещаю его на Spark 1.5.2 (обновление кластера вне моего контроля). Неисправность выглядит следующим образом:
Caused by: java.io.NotSerializableException: com.location.model.Profile
Serialization stack:
- object not serializable (class: com.location.model.Profile, value: com.location.model.Profile@596032b0)
- field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1, name: zeroValue$3, type: class java.lang.Object)
- object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1, <function0>)
- field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$1, name: $outer, type: class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1)
- object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$1, <function0>)
- field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$apply$10, name: createZero$1, type: interface scala.Function0)
- object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$apply$10, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
Интересно то, что рассматриваемый класс Profile
объявлен как class Profile() extends KryoSerializable
и переопределяет методы чтения / записи для этого интерфейса.
Я также установил эту конфигурацию на Spark-submit: "--conf" -> "'spark.serializer=org.apache.spark.serializer.KryoSerializer'"
и зарегистрировал класс Profile
в Kryo, выполнив conf.registerKryoClasses(Array(
classOf[Profile], ...
Итак, все в соответствии с инструкциями в руководстве по настройке Spark, и раньше это работало хорошо. Обратите внимание, что исключение показывает, что JavaSerializerInstance
используется ClosureCleaner
, и действительно, если я добавлю extends Serializable
к классу Profile
, он будет работать. Но я не уверен, почему он использует этот сериализатор и почему я должен быть совместим с сериализацией Java, если я специально прошу Kryo.
Изменить: я даже полностью удалил параметр, так как код под registerKryoClasses
в любом случае устанавливает свойство. На самом деле, я подозреваю, что используется сериализация Kryo (я добавил println внутри write
, и он появляется, но какая-то предыдущая проверка неверна).