Как правильно распараллелить задание pyspark на нескольких узлах и избежать проблем с памятью?

В настоящее время я работаю над заданием PySpark (Spark 2.2.0), которое предназначено для обучения модели скрытого распределения Дирихле на основе набора документов. Входные документы предоставляются в виде файла CSV, расположенного в Google Cloud Storage.

Следующий код успешно выполнялся в кластере Google Cloud Dataproc с одним узлом (4 виртуальных ЦП / 15 ГБ памяти) с небольшим подмножеством документов (~ 6500), небольшим количеством тем для создания (10) и небольшим количеством итераций (100). . Однако другие попытки с большим набором документов или более высокими значениями количества тем или количества итераций быстро приводили к проблемам с памятью и сбоям в работе.

Кроме того, при отправке этого задания в кластер из 4 узлов я мог видеть, что на самом деле работает только один рабочий узел (30% загрузки ЦП), что позволяет мне думать, что код не оптимизирован должным образом для параллельной обработки.

Код

conf = pyspark.SparkConf().setAppName("lda-training")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# CSV schema declaration
csv_schema = StructType([StructField("doc_id", StringType(), True),  # id of the document
                         StructField("cleaned_content", StringType(), True)])  # cleaned text content (used for LDA)

# Step 1: Load CSV
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema)

print("{} document(s) loaded".format(doc_df.count()))
# This prints "25000 document(s) loaded"

print("{} partitions".format(doc_df.rdd.getNumPartitions()))
# This prints "1"

# Step 2: Extracting words
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType()))
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"]))

# Step 3: Generate count vectors (BOW) for each document
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
vectorizer_model = count_vectorizer.fit(doc_df)
dataset = vectorizer_model.transform(doc_df)

# Instantiate LDA model
lda = LDA(k=100,  # number of topics
          optimizer="online", # 'online' or 'em'
          maxIter=100,
          featuresCol="features",
          topicConcentration=0.01,  # beta
          optimizeDocConcentration=True,  # alpha
          learningOffset=2.0,
          learningDecay=0.8,
          checkpointInterval=10,
          keepLastCheckpoint=True)

# Step 4: Train LDA model on corpus (this is the long part of the job)
lda_model = lda.fit(dataset)

# Save LDA model to Cloud Storage
lda_model.write().overwrite().save("gs://...")

Ниже приведены примеры предупреждений и сообщений об ошибках:

WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Вопросы

  • Можно ли как-то оптимизировать сам код, чтобы обеспечить его масштабируемость?
  • Как мы можем заставить Spark распределить задание по всем рабочим узлам и избежать проблем с памятью?

person Thomas Laporte    schedule 25.08.2017    source источник


Ответы (1)


Если размер ваших входных данных невелик, даже если ваш конвейер в конечном итоге выполняет плотные вычисления с небольшими данными, то секционирование на основе размера приведет к слишком малому количеству секций для масштабируемости. Поскольку ваш getNumPartitions() печатает 1, это означает, что Spark будет использовать не более 1 ядра-исполнителя для обработки этих данных, поэтому вы видите только один рабочий узел.

Вы можете попробовать изменить начальную строку spark.read.csv, добавив в конце repartition:

doc_df = spark.read.csv(path="gs://...", ...).repartition(32)

Затем вы можете убедиться, что он сделал то, что вы ожидали, увидев getNumPartitions() print 32 в последней строке.

person Dennis Huo    schedule 26.08.2017
comment
Я действительно мог убедиться, что увеличение количества разделов позволило Spark разделить задание между несколькими рабочими процессами. Учитывая это, я все еще сталкиваюсь с множеством проблем с памятью (либо Container killed by YARN for exceeding memory limits, либо java.lang.OutOfMemoryError: Java heap space). Файл CSV, взятый в качестве входных данных, весит около 600 МБ. Не могли бы вы посоветовать, как оценить правильное количество разделов по сравнению с размером рабочих узлов (количество ядер ЦП + память)? - person Thomas Laporte; 29.08.2017