Как да извлечете данни от Spark MLlib FP Growth модел

Изпълнявам spark master и slaves в самостоятелен режим, без Hadoop клъстер. Използвайки spark-shell, мога бързо да създам FPGrowthModel с моите данни. След като моделът е изграден, се опитвам да разгледам моделите и честотите, уловени в модела, но искра виси в метода collect() (като гледам потребителския интерфейс на Spark) с по-голям набор от данни (200 000 * 2000 данни, подобни на матрица). Ето кода, който изпълнявам в spark-shell:

import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.rdd.RDD

val textFile = sc.textFile("/path/to/txt/file")
val data = textFile.map(_.split(" ")).cache()

val fpg = new FPGrowth().setMinSupport(0.9).setNumPartitions(8)
val model = fpg.run(data)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

Опитах се да увелича паметта на spark shell от 512MB на 2GB, но не изглежда облекчи проблема с висенето. Не съм сигурен дали това е така, защото Hadoop е необходим, за да изпълни тази задача, или трябва да увелича още повече паметта на spark-shell, или нещо друго.

15/08/10 22:19:40 ERROR TaskSchedulerImpl: Lost executor 0 on 142.103.22.23: remote Rpc client disassociated
15/08/10 22:19:40 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:43440] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/0 is now EXITED (Command exited with code 137)
15/08/10 22:19:40 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 4.0
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Executor app-20150810163957-0001/0 removed: Command exited with code 137
15/08/10 22:19:40 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 59, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 6.0 in stage 4.0 (TID 62, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 56, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 58, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 5.0 in stage 4.0 (TID 61, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 60, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 7.0 in stage 4.0 (TID 63, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 57, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor added: app-20150810163957-0001/1 on worker-20150810163259-142.103.22.23-48853 (142.103.22.23:48853) with 8 cores
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150810163957-0001/1 on hostPort 142.103.22.23:48853 with 8 cores, 15.0 GB RAM
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now LOADING
15/08/10 22:19:40 INFO DAGScheduler: Executor lost: 0 (epoch 2)
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now RUNNING
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 142.103.22.23, 37411)
15/08/10 22:19:40 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor
15/08/10 22:19:40 INFO ShuffleMapStage: ShuffleMapStage 3 is now unavailable on executor 0 (0/16, false)

person emily    schedule 10.08.2015    source източник
comment
Получавате ли съобщение за грешка? или просто затваря?   -  person Dr VComas    schedule 11.08.2015


Отговори (3)


Не трябва да изпълнявате .collect(), ако наборът от данни е голям, например ако е няколко GB, не трябва да го използвате, това помага да се ускорят нещата за извършване на няколко оценки. Стартирайте цикъла foreach без събиране.

person Dr VComas    schedule 10.08.2015
comment
Здравейте, благодаря за вашата помощ, премахнах метода collect() от кода и намерих отпечатан изход в секцията stdout на потребителския интерфейс на Spark, това е една стъпка напред! Въпреки това, 6 часа по-късно има някои съобщения за грешка в конзолата, не мога да разбера дали моделът е завършил итерацията през всички елементи. Прикачих ги в първоначалния въпрос. Благодаря ти! - person emily; 11.08.2015
comment
Можете първо да опитате с по-малък набор от данни, да видите дали работи, след което ще разберете дали е проблем с ресурсите или какво. - person Dr VComas; 11.08.2015
comment
Благодаря! Мисля, че моделът ми беше твърде голям, филтрирах елементите и ситуацията беше решена - person emily; 11.08.2015

Kryo е по-бърз сериализатор от org.apache.spark.serializer.JavaSerializer. Възможно заобиколно решение е да кажете на Spark да не използва Kryo:

val conf = (new org.apache.spark.SparkConf()
.setAppName("APP_NAME")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

И опитайте да стартирате отново вашия код по-горе.

Вижте тази връзка за справка:

FPGrowth алгоритъм в Spark

person Luis    schedule 28.09.2015
comment
Моля, не публикувайте абсолютно един и същ отговор на няколко въпроса: или не е подходящ за всички, или въпросите са дублирани, които трябва да бъдат маркирани/затворени като такива. - person kleopatra; 28.09.2015
comment
Благодаря и за двата коментара... Ще редактирам отговора, за да включва основни части и ще предоставя връзката за справка. - person Luis; 29.09.2015

Опитайте да замените collect() с локален итератор. В крайна сметка може да се натъкнете на ограничение на изпълнението на FPGrowth. Вижте моята публикация тук и Проблем с Spark JIRA.

person Raj    schedule 12.01.2016