Историята е, когато се опитвам да разреша проблема с големите данни в нашата работа на Spark, която събираше всички данни към възела на драйвера, вместо да получава и обработва данни на всеки дял сега, след което получи грешка при сериализиране
org.apache.spark.SparkException: Task not serializable
Първото нещо, което направих, е да се опитам да намеря подходяща информация за тази грешка в Stack Overflow, Medium и т.н. ... За щастие получих някои съвети (препратки в приложението), за които мисля, че ми помогнаха да науча повече как протича работата на Spark, каква е мързеливата оценка и някои бележки, когато използваме flatMap
или mapPartition
и т.н. ... И ще споделя тук за всеки, който получи същия проблем като мен, може да спести много време за разрешаване на вашия проблем.
Защо се нуждаем от сериализация в задачата на Spark?
Механизмът на Spark ще раздели задание на множество дъщерни задания, изпратени до изпълнителите в клъстерната мрежа на Spark и ще се изпълнява върху това. За да направи това, Spark ще сериализира обекта в байтов поток, който може да бъде върнат обратно в копие на обекта.
И ако има променлива, препратка във вашия обект не може да бъде сериализирана, тогава Spark ще изведе грешка Task not serializable
.
Как да го разрешим?
Трудно е да се каже какво точно е хванало за проблеми със сериализацията, защото зависи от кода на работата ви в Spark. Но има някои съвети, които трябва да опитате:
- Декларирайте вашите променливи и функции вътре в Object колкото е възможно повече. Един обект изглежда като глобална променлива и можем да използваме навсякъде във вашия код.
- Декларирайте собствения си клас за разширяване
Serializable
, за да сте сигурни, че вашият клас ще бъде прехвърлен правилно. - Ако има променлива, която не може да се сериализира, тогава можете да използвате анотация
@transient
като тази:
@transient lazy val queue: AmazonSQS = AWSQueueUtils.getClient(config)
- Ако използвате
flatMap
илиmapPartitions
, тогава трябва да следвате практиката по-долу поради използването наTravelsableOne
val result: Int = df.rdd.mapPartitions(p => Iterator { // Collect data per partition first val partition: Seq[Row] = p.toList // Any calculation on partition val ... }).collect().sum
Как да го отстраним?
Важният ключ за разрешаване на този проблем е как да отстраните грешки и да разберете коя точно е променлива или препратка, която е уловила грешката при сериализиране.
Първо, Spark използва SerializationDebugger като дебъгер по подразбиране, за да открие проблемите със сериализацията, но понякога може да се сблъска с JVM грешка
SerializationDebugger:java.lang.StackOverflowError
Можете също да го изключите, като включите флаговете за разширена информация за отстраняване на грешки в JVM, като зададете следните свойства в Конфигурацията на Spark, докато създавате клъстера, или ги добавите към spark-submit
параметри
spark.driver.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true spark.executor.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true
Второ, ако внедрите вашето задание на Spark в режим YARN, понякога не можете да получите никакъв регистрационен файл (отпечатване в конзолата) за задания на изпълнители. Трябва да зададете yarn.log-aggregation-enable=true
в настройките на вашия клъстер.
След това се надявам, че тези бележки могат да ви помогнат да научите и разрешите по-лесно проблема си със сериализацията на Spark. И също така много благодаря на всички автори в приложението по-долу, всичките ви бележки ми помогнаха много да разреша собствения си проблем.