В настоящее время я работаю над заданием PySpark (Spark 2.2.0), которое предназначено для обучения модели скрытого распределения Дирихле на основе набора документов. Входные документы предоставляются в виде файла CSV, расположенного в Google Cloud Storage.
Следующий код успешно выполнялся в кластере Google Cloud Dataproc с одним узлом (4 виртуальных ЦП / 15 ГБ памяти) с небольшим подмножеством документов (~ 6500), небольшим количеством тем для создания (10) и небольшим количеством итераций (100). . Однако другие попытки с большим набором документов или более высокими значениями количества тем или количества итераций быстро приводили к проблемам с памятью и сбоям в работе.
Кроме того, при отправке этого задания в кластер из 4 узлов я мог видеть, что на самом деле работает только один рабочий узел (30% загрузки ЦП), что позволяет мне думать, что код не оптимизирован должным образом для параллельной обработки.
Код
conf = pyspark.SparkConf().setAppName("lda-training")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# CSV schema declaration
csv_schema = StructType([StructField("doc_id", StringType(), True), # id of the document
StructField("cleaned_content", StringType(), True)]) # cleaned text content (used for LDA)
# Step 1: Load CSV
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema)
print("{} document(s) loaded".format(doc_df.count()))
# This prints "25000 document(s) loaded"
print("{} partitions".format(doc_df.rdd.getNumPartitions()))
# This prints "1"
# Step 2: Extracting words
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType()))
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"]))
# Step 3: Generate count vectors (BOW) for each document
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
vectorizer_model = count_vectorizer.fit(doc_df)
dataset = vectorizer_model.transform(doc_df)
# Instantiate LDA model
lda = LDA(k=100, # number of topics
optimizer="online", # 'online' or 'em'
maxIter=100,
featuresCol="features",
topicConcentration=0.01, # beta
optimizeDocConcentration=True, # alpha
learningOffset=2.0,
learningDecay=0.8,
checkpointInterval=10,
keepLastCheckpoint=True)
# Step 4: Train LDA model on corpus (this is the long part of the job)
lda_model = lda.fit(dataset)
# Save LDA model to Cloud Storage
lda_model.write().overwrite().save("gs://...")
Ниже приведены примеры предупреждений и сообщений об ошибках:
WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Вопросы
- Можно ли как-то оптимизировать сам код, чтобы обеспечить его масштабируемость?
- Как мы можем заставить Spark распределить задание по всем рабочим узлам и избежать проблем с памятью?