Историята е, когато се опитвам да разреша проблема с големите данни в нашата работа на Spark, която събираше всички данни към възела на драйвера, вместо да получава и обработва данни на всеки дял сега, след което получи грешка при сериализиране

org.apache.spark.SparkException: Task not serializable

Първото нещо, което направих, е да се опитам да намеря подходяща информация за тази грешка в Stack Overflow, Medium и т.н. ... За щастие получих някои съвети (препратки в приложението), за които мисля, че ми помогнаха да науча повече как протича работата на Spark, каква е мързеливата оценка и някои бележки, когато използваме flatMap или mapPartition и т.н. ... И ще споделя тук за всеки, който получи същия проблем като мен, може да спести много време за разрешаване на вашия проблем.

Защо се нуждаем от сериализация в задачата на Spark?

Механизмът на Spark ще раздели задание на множество дъщерни задания, изпратени до изпълнителите в клъстерната мрежа на Spark и ще се изпълнява върху това. За да направи това, Spark ще сериализира обекта в байтов поток, който може да бъде върнат обратно в копие на обекта.

И ако има променлива, препратка във вашия обект не може да бъде сериализирана, тогава Spark ще изведе грешка Task not serializable.

Как да го разрешим?

Трудно е да се каже какво точно е хванало за проблеми със сериализацията, защото зависи от кода на работата ви в Spark. Но има някои съвети, които трябва да опитате:

  • Декларирайте вашите променливи и функции вътре в Object колкото е възможно повече. Един обект изглежда като глобална променлива и можем да използваме навсякъде във вашия код.
  • Декларирайте собствения си клас за разширяване Serializable, за да сте сигурни, че вашият клас ще бъде прехвърлен правилно.
  • Ако има променлива, която не може да се сериализира, тогава можете да използвате анотация @transient като тази:
@transient lazy val queue: AmazonSQS = AWSQueueUtils.getClient(config)
  • Ако използвате flatMap или mapPartitions, тогава трябва да следвате практиката по-долу поради използването на TravelsableOne
val result: Int = df.rdd.mapPartitions(p => Iterator {
  // Collect data per partition first
  val partition: Seq[Row] = p.toList
  // Any calculation on partition val
  ...
}).collect().sum

Как да го отстраним?

Важният ключ за разрешаване на този проблем е как да отстраните грешки и да разберете коя точно е променлива или препратка, която е уловила грешката при сериализиране.

Първо, Spark използва SerializationDebugger като дебъгер по подразбиране, за да открие проблемите със сериализацията, но понякога може да се сблъска с JVM грешка

SerializationDebugger:java.lang.StackOverflowError

Можете също да го изключите, като включите флаговете за разширена информация за отстраняване на грешки в JVM, като зададете следните свойства в Конфигурацията на Spark, докато създавате клъстера, или ги добавите към spark-submit параметри

spark.driver.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true
spark.executor.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true

Второ, ако внедрите вашето задание на Spark в режим YARN, понякога не можете да получите никакъв регистрационен файл (отпечатване в конзолата) за задания на изпълнители. Трябва да зададете yarn.log-aggregation-enable=true в настройките на вашия клъстер.

След това се надявам, че тези бележки могат да ви помогнат да научите и разрешите по-лесно проблема си със сериализацията на Spark. И също така много благодаря на всички автори в приложението по-долу, всичките ви бележки ми помогнаха много да разреша собствения си проблем.

Приложение