Изключение за сериализация при искра

Срещам много странен проблем на Spark относно сериализацията. Кодът е както следва:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
    def infer(document: RDD[Document]): RDD[DocumentParameter] = {
      val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
      docs
    }
}

където документът е дефиниран като:

class Document(val tokens: SparseVector[Int]) extends Serializable

и DocumentParameter е:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
{
  def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics))
}

SparseVectors е сериализиращ се клас в breeze.linalg.SparseVector.

Това е проста процедура за карта и всички класове могат да се сериализират, но получавам това изключение:

org.apache.spark.SparkException: Task not serializable

Но когато премахна параметъра numOfTopics, това е:

object DocumentParameter extends Serializable
{
  def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10))
}

и го наречете така:

val docs = documents.map(DocumentParameter.apply)

и изглежда добре.

Тип Int не може ли да се сериализира? Но виждам, че някакъв код е написан така.

Не съм сигурен как да поправя тази грешка.

#АКТУАЛИЗИРАНО#:

Благодаря ти @samthebest. Ще добавя повече подробности за него.

stack trace:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.topicmodel.PLSA.infer(PLSA.scala:13)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC.<init>(<console>:41)
    at $iwC.<init>(<console>:43)
    at <init>(<console>:45)
    at .<init>(<console>:49)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 46 more

Тъй като проследяването на стека дава обща информация за изключение, аз го премахнах.

Пускам кода в spark-shell.

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

Бихте ли ми дали някои уроци или съвети за сериализуемо?


person superhan    schedule 20.12.2014    source източник


Отговори (2)


Анонимните функции сериализират съдържащия ги клас. Когато map {doc => DocumentParameter(doc, numOfTopics)}, единственият начин, по който може да даде на тази функция достъп до numOfTopics, е да сериализира класа PLSA. И този клас всъщност не може да бъде сериализиран, защото (както можете да видите от stacktrace) той съдържа SparkContext, който не може да бъде сериализиран (лоши неща биха се случили, ако отделните клъстерни възли имат достъп до контекста и биха могли например да създават нови задания от в картограф).

Като цяло, опитайте се да избягвате съхраняването на SparkContext във вашите класове (редактирайте: или поне се уверете, че е много ясно какъв вид класове съдържат SparkContext и какви не); по-добре е да го подадете като (евентуално implicit) параметър на отделни методи, които се нуждаят от него. Като алтернатива, преместете функцията {doc => DocumentParameter(doc, numOfTopics)} в различен клас от PLSA, такъв, който наистина може да бъде сериализиран.

(Както много хора предложиха, възможно е да запазите SparkContext в класа, но маркиран като @transient, така че да не бъде сериализиран. Не препоръчвам този подход; това означава, че класът ще „магически“ промени състоянието си, когато се сериализира ( загуба на SparkContext), така че може да се окажете с NPE, когато се опитате да получите достъп до SparkContext от вътрешността на сериализирано задание. По-добре е да поддържате ясна разлика между класовете, които се използват само в "контролния" код (и може да използват SparkContext) и класове, които са сериализирани, за да работят в клъстера (който не трябва да има SparkContext)).

person lmm    schedule 20.12.2014
comment
Благодаря ти. Работи както предложихте. Освен това намирам друг начин за решаване на този проблем: добавете @transient преди val sc : SparkContext, тогава SparkContext няма да бъде сериализирано. - person superhan; 24.12.2014
comment
Не съм съгласен, че трябва да избягвате да съхранявате SparkContext във вашите класове изцяло (но все пак гласувате за). Ако не ги съхранявате в обхват, тогава можете да получите раздуване на параметрите (което е грозно дори когато използвате имплицитни параметри). Единствената алтернатива е да поставите в него някакъв глобален сингълтон някъде, което създава собствени проблеми (ужасяващи нулеви указатели). - person samthebest; 29.12.2014

Това наистина е странно, но мисля, че мога да отгатна проблема. Но първо, не сте предоставили необходимия минимум за решаване на проблема (мога да се досетя, защото съм виждал 100 от тях преди). Ето някои проблеми с вашия въпрос:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = {
  val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
}

Този метод не връща RDD[DocumentParameter], а Unit. Трябва да сте копирали и поставили кода неправилно.

Второ, не сте предоставили цялото проследяване на стека? Защо? Няма причина да НЕ предоставите пълното проследяване на стека, а пълното проследяване на стека със съобщение е необходимо, за да разберете грешката - човек се нуждае от цялата грешка, за да разбере каква е грешката. Обикновено изключение, което не може да бъде сериализирано, ви казва какво не може да бъде сериализирано.

Трето, не ни казахте къде е методът infer, правите ли това в обвивка? Какъв е съдържащият се обект/клас/характеристика и т.н. на infer?

Както и да е, предполагам, че като подадете Int, вие причинявате сериализиране на верига от неща, които не очаквате, не мога да ви дам повече информация от това, докато не предоставите минималния код, за да можем разбирам напълно проблема ви.

person samthebest    schedule 20.12.2014