Исключение сериализации на искре

Я встречаю очень странную проблему в Spark по поводу сериализации. Код выглядит следующим образом:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
    def infer(document: RDD[Document]): RDD[DocumentParameter] = {
      val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
      docs
    }
}

где документ определяется как:

class Document(val tokens: SparseVector[Int]) extends Serializable

и DocumentParameter:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
{
  def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics))
}

SparseVector — это сериализуемый класс в breeze.linalg.SparseVector.

Это простая процедура карты, и все классы сериализуемы, но я получаю это исключение:

org.apache.spark.SparkException: Task not serializable

Но когда я удаляю параметр numOfTopics, то есть:

object DocumentParameter extends Serializable
{
  def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10))
}

и назовите это так:

val docs = documents.map(DocumentParameter.apply)

и вроде нормально.

Является ли тип Int несериализуемым? Но я вижу, что какой-то код написан именно так.

Я не уверен, как исправить эту ошибку.

#ОБНОВЛЕНО#:

Спасибо @samthebest. Я добавлю больше подробностей об этом.

stack trace:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.topicmodel.PLSA.infer(PLSA.scala:13)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC.<init>(<console>:41)
    at $iwC.<init>(<console>:43)
    at <init>(<console>:45)
    at .<init>(<console>:49)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 46 more

Поскольку трассировка стека дает общую информацию об исключении, я удалил ее.

Я запускаю код в искровой оболочке.

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

Не могли бы вы дать мне несколько руководств или советов по сериализации?


person superhan    schedule 20.12.2014    source источник


Ответы (2)


Анонимные функции сериализуют содержащий их класс. Когда вы map {doc => DocumentParameter(doc, numOfTopics)}, единственный способ предоставить этой функции доступ к numOfTopics — это сериализовать класс PLSA. И этот класс на самом деле не может быть сериализован, потому что (как вы можете видеть из трассировки стека) он содержит SparkContext, который не сериализуем (плохие вещи могли бы произойти, если бы отдельные узлы кластера имели доступ к контексту и могли бы, например, создавать новые задания из внутри картографа).

В общем, старайтесь избегать хранения SparkContext в своих классах (отредактируйте: или, по крайней мере, убедитесь, что очень ясно, какие классы содержат SparkContext, а какие нет); лучше передать его как параметр (возможно, implicit) отдельным методам, которым он нужен. В качестве альтернативы переместите функцию {doc => DocumentParameter(doc, numOfTopics)} в класс, отличный от PLSA, который действительно можно сериализовать.

(Как предложили несколько человек, можно сохранить SparkContext в классе, но пометить его как @transient, чтобы он не сериализовался. Я не рекомендую этот подход; это означает, что класс "волшебным образом" изменит состояние при сериализации ( потеря SparkContext), и поэтому вы можете получить NPE, когда пытаетесь получить доступ к SparkContext из сериализованного задания. Лучше поддерживать четкое различие между классами, которые используются только в «управляющем» коде (и могут использовать SparkContext) и классы, сериализованные для работы в кластере (который не должен иметь SparkContext)).

person lmm    schedule 20.12.2014
comment
Спасибо. Это работает, как вы предложили. Также я нахожу другой способ решить эту проблему: добавить @transient перед val sc : SparkContext, тогда SparkContext сериализоваться не будет. - person superhan; 24.12.2014
comment
Я не согласен с тем, что вам следует полностью избегать хранения SparkContext в своих классах (но, тем не менее, за это проголосовали). Если вы не сохраните их в области видимости, вы можете получить раздувание параметров (что некрасиво даже при использовании неявных параметров). Единственная альтернатива - вставить в него какой-нибудь глобальный синглтон, который сам по себе создает проблемы (ужасные нулевые указатели). - person samthebest; 29.12.2014

Это действительно странно, но я думаю, что могу догадаться, в чем проблема. Но, во-первых, вы не предоставили минимума для решения проблемы (я могу догадаться, потому что я видел сотни таких раньше). Вот некоторые проблемы с вашим вопросом:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = {
  val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
}

Этот метод не возвращает RDD[DocumentParameter], он возвращает Unit. Вы, должно быть, неправильно скопировали и вставили код.

Во-вторых, вы не предоставили всю трассировку стека? Почему? Нет никаких причин НЕ предоставлять полную трассировку стека, а полная трассировка стека с сообщением необходима для понимания ошибки - нужна вся ошибка, чтобы понять, что это за ошибка. Обычно несериализуемое исключение сообщает вам, что не сериализуемо.

В-третьих, вы не сказали нам, где находится метод infer, вы делаете это в оболочке? Что такое содержащий объект/класс/признак и т. д. infer?

В любом случае, я собираюсь предположить, что, передавая Int, вы вызываете сериализацию цепочки вещей, которую вы не ожидаете, я не могу дать вам больше информации, чем это, пока вы не предоставите минимальный код, чтобы мы могли полностью понять вашу проблему.

person samthebest    schedule 20.12.2014