Ошибка сериализации Spark Kryo

У меня есть фрагмент кода Spark, который работал на Spark 1.3, но не работает, когда я перемещаю его на Spark 1.5.2 (обновление кластера вне моего контроля). Неисправность выглядит следующим образом:

Caused by: java.io.NotSerializableException: com.location.model.Profile
Serialization stack:
    - object not serializable (class: com.location.model.Profile, value: com.location.model.Profile@596032b0)
    - field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1, name: zeroValue$3, type: class java.lang.Object)
    - object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1, <function0>)
    - field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$1, name: $outer, type: class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1)
    - object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$1, <function0>)
    - field (class: org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$apply$10, name: createZero$1, type: interface scala.Function0)
    - object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1$$anonfun$apply$10, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

Интересно то, что рассматриваемый класс Profile объявлен как class Profile() extends KryoSerializable и переопределяет методы чтения / записи для этого интерфейса.

Я также установил эту конфигурацию на Spark-submit: "--conf" -> "'spark.serializer=org.apache.spark.serializer.KryoSerializer'" и зарегистрировал класс Profile в Kryo, выполнив conf.registerKryoClasses(Array( classOf[Profile], ...

Итак, все в соответствии с инструкциями в руководстве по настройке Spark, и раньше это работало хорошо. Обратите внимание, что исключение показывает, что JavaSerializerInstance используется ClosureCleaner, и действительно, если я добавлю extends Serializable к классу Profile, он будет работать. Но я не уверен, почему он использует этот сериализатор и почему я должен быть совместим с сериализацией Java, если я специально прошу Kryo.


Изменить: я даже полностью удалил параметр, так как код под registerKryoClasses в любом случае устанавливает свойство. На самом деле, я подозреваю, что используется сериализация Kryo (я добавил println внутри write, и он появляется, но какая-то предыдущая проверка неверна).


person Daniel Langdon    schedule 16.03.2016    source источник


Ответы (1)


Вы пытались удалить 'из своей отправки, имхо это должно быть

--conf "spark.serializer = org.apache.spark.serializer.KryoSerializer"

Вы случайно не отправляете от Луиджи?

person Igor Berman    schedule 16.03.2016
comment
Я спрашивал об удалении одинарных кавычек, которые обертывают ваш параметр conf и не удаляют его полностью - person Igor Berman; 18.03.2016
comment
Тот же эффект, вы в основном указываете, правильно ли установлена ​​конфигурация. - person Daniel Langdon; 18.03.2016
comment
Да, в самом деле. еще один момент, о котором я могу подумать - почему вы регистрируете массив профиля, а не сам профиль? - похоже, что у Spark проблемы с закрытием, у которого в качестве параметра указан Profile (из stacktrace) - person Igor Berman; 18.03.2016
comment
Я не делаю. registerKryoClasses ожидает, что будет зарегистрирован массив классов. Profile - лишь первый из этих классов. То, что вы упомянули, будет выглядеть как classOf[Array[Profile]]. Я ценю помощь :-) - person Daniel Langdon; 18.03.2016
comment
и, к сожалению, никакого замыкания нет, строка 107 на самом деле .aggregateByKey(new Profile(), 3200), я просто указываю нулевое значение :-( - person Daniel Langdon; 18.03.2016