Как извлечь данные из модели роста Spark MLlib FP

Я использую искровой мастер и ведомые устройства в автономном режиме, без кластера Hadoop. Используя spark-shell, я могу быстро построить FPGrowthModel с моими данными. После того, как модель построена, я пытаюсь просмотреть шаблоны и частоты, зафиксированные в модели, но искра зависает в методе collect() (просматривая пользовательский интерфейс Spark) с большим набором данных (матрица 200000 * 2000, как данные). Вот код, который я запускаю в spark-shell:

import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.rdd.RDD

val textFile = sc.textFile("/path/to/txt/file")
val data = textFile.map(_.split(" ")).cache()

val fpg = new FPGrowth().setMinSupport(0.9).setNumPartitions(8)
val model = fpg.run(data)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

Я попытался увеличить объем памяти искровой оболочки с 512 МБ до 2 ГБ, но проблема с зависанием не исчезла. Я не уверен, что это потому, что для выполнения этой задачи необходим Hadoop, или мне нужно еще больше увеличить память искровой оболочки, или что-то еще.

15/08/10 22:19:40 ERROR TaskSchedulerImpl: Lost executor 0 on 142.103.22.23: remote Rpc client disassociated
15/08/10 22:19:40 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:43440] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/0 is now EXITED (Command exited with code 137)
15/08/10 22:19:40 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 4.0
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Executor app-20150810163957-0001/0 removed: Command exited with code 137
15/08/10 22:19:40 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 59, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 6.0 in stage 4.0 (TID 62, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 56, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 58, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 5.0 in stage 4.0 (TID 61, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 60, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 7.0 in stage 4.0 (TID 63, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 57, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor added: app-20150810163957-0001/1 on worker-20150810163259-142.103.22.23-48853 (142.103.22.23:48853) with 8 cores
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150810163957-0001/1 on hostPort 142.103.22.23:48853 with 8 cores, 15.0 GB RAM
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now LOADING
15/08/10 22:19:40 INFO DAGScheduler: Executor lost: 0 (epoch 2)
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now RUNNING
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 142.103.22.23, 37411)
15/08/10 22:19:40 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor
15/08/10 22:19:40 INFO ShuffleMapStage: ShuffleMapStage 3 is now unavailable on executor 0 (0/16, false)

person emily    schedule 10.08.2015    source источник
comment
Вы получаете сообщение об ошибке? или просто зависает?   -  person Dr VComas    schedule 11.08.2015


Ответы (3)


Вы не должны запускать .collect(), если набор данных большой, например, если он составляет несколько ГБ, вы не должны его использовать, это помогает ускорить выполнение нескольких оценок. Запустите цикл foreach без сбора.

person Dr VComas    schedule 10.08.2015
comment
Привет, спасибо за вашу помощь, я удалил метод collect () из кода и нашел печатный вывод в разделе Spark UI stdout, это один шаг вперед! Однако через 6 часов в консоли появляются сообщения об ошибках, я не могу сказать, закончила ли модель перебор всех элементов. Я прикрепил их к исходному вопросу. Благодарю вас! - person emily; 11.08.2015
comment
Вы можете сначала попробовать это с меньшим набором данных, посмотреть, работает ли это, а затем выясните, проблема с ресурсами или что-то еще. - person Dr VComas; 11.08.2015
comment
Благодарность! Я думаю, что моя модель была слишком большой, я отфильтровал элементы, и ситуация была улажена. - person emily; 11.08.2015

Kryo — более быстрый сериализатор, чем org.apache.spark.serializer.JavaSerializer. Возможный обходной путь — попросить искру не использовать Kryo:

val conf = (new org.apache.spark.SparkConf()
.setAppName("APP_NAME")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

И попробуйте снова запустить код выше.

См. эту ссылку для справки:

Алгоритм роста FPG в Spark

person Luis    schedule 28.09.2015
comment
Пожалуйста, не публикуйте один и тот же ответ на несколько вопросов: либо он не подходит для всех, либо вопросы являются дубликатами, которые следует пометить / закрыть как таковые. - person kleopatra; 28.09.2015
comment
Спасибо за оба комментария... Я отредактирую ответ, включив в него основные части и предоставив ссылку для справки. - person Luis; 29.09.2015

Попробуйте заменить collect() на локальный итератор. В конечном счете, вы можете столкнуться с ограничением реализации FPGrowth. См. мою публикацию здесь и проблема Spark JIRA.

person Raj    schedule 12.01.2016