Как вы распараллеливаете создание RDD/DataFrame в Spark?

Скажем, у меня есть искровая работа, которая выглядит следующим образом:

def loadTable1() {
  val table1 = sqlContext.jsonFile(s"s3://textfiledirectory/")
  table1.cache().registerTempTable("table1")
}  

def loadTable2() {
  val table2 = sqlContext.jsonFile(s"s3://testfiledirectory2/")
  table2.cache().registerTempTable("table2")
} 


def loadAllTables() {
  loadTable1()
  loadTable2()
}

loadAllTables()

Как распараллелить это задание Spark, чтобы обе таблицы создавались одновременно?


person Brandon    schedule 07.07.2015    source источник
comment
Вы также разместили это в списке рассылки user@spark. Если кто-то хочет прочитать обсуждение, тема — Распараллеливание создания нескольких RDD/DataFrame в Spark.   -  person Daniel Darabos    schedule 09.07.2015


Ответы (3)


Вам не нужно распараллеливать это. Операции создания RDD/DF ничего не делают. Эти структуры данных являются ленивыми, поэтому любые фактические вычисления будут происходить только тогда, когда вы начнете их использовать. И когда вычисление Spark произойдет, оно будет автоматически распараллелено (раздел за разделом). Spark распределит работу между исполнителями. Таким образом, вы, как правило, ничего не выиграете, вводя дополнительный параллелизм.

person Daniel Darabos    schedule 08.07.2015
comment
Этот ответ в некотором роде верен, однако, поскольку мы находимся в стране Spark SQL, все становится немного иначе. Операции создания и регистрации фрейма данных на самом деле требуют некоторой работы, особенно с json или подобными структурами, где Spark SQL необходимо сканировать некоторые строки для определения схемы, а не просто читать схему из заголовка. Если мы посмотрим в Catalog.scala, то увидим, что регистрация таблицы требует поиска плана, и это требует некоторого анализа. Мы проверяем это, создавая кадр данных, которого не существует, и видя, что он терпит неудачу (что означает, что оценка не была полностью ленивой). - person Holden; 08.07.2015
comment
Спасибо за комментарий Холден! Я не использую DataFrames, поэтому я не знал, что входит в их создание. Тем не менее, создание должно быть очень дешевым по сравнению с обработкой — я не думаю, что можно много выиграть от распараллеливания. - person Daniel Darabos; 09.07.2015
comment
Интересным вариантом использования было бы, если бы это были перманентные, а не временные таблицы, например, в формате паркета с режимом добавления. В этом случае это совсем не лень, потому что вы можете материализовать множество таблиц как часть процесса ETL. Я пытаюсь сделать что-то подобное. Я понимаю, что каждая загружаемая таблица в любом случае распараллеливается, но если каждая таблица невелика, то registerTempTable не будет создавать много рабочих процессов, поэтому распараллеливание множества загрузок, каждая из которых является последовательной (или минимально параллельной), может быть более эффективное использование ресурсов. - person Davos; 20.04.2017

Используйте фьючерсы!

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

def loadAllTables() {
  Future { loadTable1() }
  Future { loadTable2() }
}
person BAR    schedule 12.10.2015

Вы можете сделать это с помощью стандартного механизма потоковой передачи scala. Лично я хотел бы составить список пар с именем пути и таблицы, а затем выполнить параллельную карту. Вы также можете посмотреть фьючерсы или стандартные потоки.

person Holden    schedule 08.07.2015
comment
Этот ответ не относится к Spark, который работает на нескольких узлах. - person The Archetypal Paul; 08.07.2015
comment
Привет, Пол, это действительно применимо, поскольку регистрация таблиц может быть выполнена только в программе драйвера. - person Holden; 08.07.2015
comment
Нет, это не так. См. другой ответ о лени. - person The Archetypal Paul; 08.07.2015
comment
Я добавил комментарий к другому ответу, объясняя, что, хотя создание RDD совершенно лениво, поскольку мы говорим о фреймах данных (особенно фреймах данных json), оценка не совсем ленива. Вы можете убедиться в этом, попытавшись создать json Dataframe из файла, который не существует, и заметить, что он сразу дает сбой, в отличие от обычного RDD, где он полностью ленив и не сработает, пока мы не выполним действие над ним. Фреймы данных + лень немного отличаются от обычных RDD. - person Holden; 08.07.2015
comment
Тем не менее кажется очевидным, что запуск его в отдельных потоках ничего вам не даст. - person The Archetypal Paul; 08.07.2015
comment
Это правда, что нет большого улучшения, но с регистрацией большого количества файлов Json в виде таблиц вы можете увидеть что-то значимое. - person Holden; 09.07.2015
comment
Давайте продолжим обсуждение в чате. - person Holden; 09.07.2015
comment
В ОП две таблицы! Вы дотягиваетесь :) (и извините, здесь поздно, нет чата) - person The Archetypal Paul; 09.07.2015
comment
Конечно, это, вероятно, не окажет большого влияния на две таблицы, но, насколько нам известно, это большие таблицы json. Это также правильный ответ на случай, если сюда придут будущие люди, которые ищут, как распараллелить создание таблиц. Я мог бы добавить примечание, что чаще всего вам не нужно будет это делать, если, по вашему мнению, это улучшит ответ? - person Holden; 09.07.2015
comment
Это все еще кажется не ответом на вопрос. Он не распараллеливает построение таблицы, а только инициализацию, насколько я понимаю. Но, может быть, нам просто придется согласиться, чтобы не согласиться. - person The Archetypal Paul; 09.07.2015
comment
Он упрощает создание Dataframe (что для json предполагает просмотр данных). Если вы хотите сделать это в чате, мы могли бы вместе рассмотреть некоторые примеры, но кажется, что согласие не соглашаться может быть более простым вариантом. - person Holden; 09.07.2015
comment
Таким образом, правильный ответ для ускорения — использовать один из считывателей JSON, который указывает схему и избегает этого шага. - person The Archetypal Paul; 09.07.2015
comment
Это работает, только если вы знаете схему заранее, что не всегда так. - person Holden; 09.07.2015
comment
Конечно. Но если бы меня беспокоила скорость, я бы очень постарался выяснить это (и я подозреваю, что это почти всегда так. Каков шанс получить доступ к большому набору данных, т. и зная, как его обрабатывать, не зная формата этих данных?) - person The Archetypal Paul; 09.07.2015
comment
Хотя пример OP не таков, я согласен, что есть законные случаи, когда вы хотите распараллелить создание RDD. В нашем коде есть куча RDD внутри фьючерсов. - person Daniel Darabos; 09.07.2015