Работата на Spark отнема много време # Код или проблем със средата?

имаме клъстер от 300 възела, като всеки възел има 132gb памет и 20 ядра. запитването е - премахнете данните от таблица A, която е в таблица B и след това обединете B с A и натиснете A към teradata.

по-долу е кодът

val ofitemp = sqlContext.sql("select * from B")
val ofifinal = sqlContext.sql("select * from A")
val selectfromfinal = sqlContext.sql("select A.a,A.b,A.c...A.x from A where A.x=B.x")
val takefromfinal = ofifinal.except(selectfromfinal)
val tempfinal = takefromfinal.unionAll(ofitemp)tempfinal.write.mode("overwrite").saveAsTable("C")
val tempTableFinal = sqlContext.table("C")tempTableFinal.write.mode("overwrite").insertInto("A")

конфигурацията, използвана за стартиране на spark, е -

EXECUTOR_MEM="16G"
HIVE_MAPPER_HEAP=2048   ## MB
NUMBER_OF_EXECUTORS="25"
DRIVER_MEM="5G"
EXECUTOR_CORES="3"

тъй като A и B имат няколко милиона записа, изпълнението на работата отнема няколко часа. Тъй като съм много нов в Spark, не разбирам - проблемът с кода ли е или проблемът с настройката на средата.

ще Ви бъда задължен, ако можете да споделите мислите си за преодоляване на проблемите с производителността.


person Sugata Kar    schedule 24.10.2018    source източник


Отговори (1)


Във вашия код except може да е тясно място, защото сравнява всички колони за равенство. Това наистина ли е това, от което се нуждаете (объркан съм относно присъединяването À.x=B.y`в реда преди)

Ако трябва да проверите само 1 атрибут, най-бързият начин би бил да направите "leftanti"-join:

val takefromfinal = ofifinal.join(ofitemp,$"A.x"===$"B.y","leftanti")

Освен това, проучете spark-UI и идентифицирайте тясното място

person Raphael Roth    schedule 24.10.2018