Функции передачи Spark при использовании наборов данных, большого класса Java и синглтона

Я прочитал этот пост здесь: https://spark.apache.org/docs/latest/programming-guide.html (см. Передача функций в Spark), но в моем случае используется типизированный набор данных с моими классами case. Я пытаюсь использовать объект singleton для хранения метода сопоставления. Мне интересно, как лучше всего упаковать функции, необходимые мне для оптимизации производительности для моей сцены (преобразование набора данных из одного типа в другой и запись в паркет).

В настоящее время этап stage занимает невероятно много времени, около 3 миллионов строк (~1,5 часа), около 880 МБ данных выводятся в паркет в s3.

Я работаю в кластерном режиме, используя минимальное количество исполнителей = 3, максимальное количество исполнителей = 10, 4 ядра на каждом исполнителе, память драйвера 8 ГБ.

--

Часть кодирования высокого уровня:

Я сопоставляю один класс C1 с другим классом C2. C1 и C2 имеют около 16 полей различных типов, таких как java.sql.Timestamp, Option[String], Option[Int], String, Int, BigInt.

case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)

Чтобы сопоставить C1 с C2, мне нужны функциональные возможности (статические методы) очень большого класса Java J, который я скопировал из https://github.com/drtimcooper./LatLongToTimezone.

public class J {
   public static String getValue((float) v) = ...
}    

Я написал функцию сопоставления внутри служебного класса Util, который имеет много других полезных функций, вызываемых функцией сопоставления.

=========

В основном мой поток кода выглядит так:

case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)

// very large java class J that only contains static methods
public class J {
   public static String getValue((float) v) = ...

   ...
}    

object Util {
  def m1(i: Int): Int = ...

  def m2(l: Option[BigDecimal], l2: Option[BigDecimal]): Int = {
      J.getValue(l.get, l2.get)
  }

  ...

  def convert_C1_to_C2(c1: C1): C2 = {
    C2(
      field1 = m1(c1.field1),
      field2 = m2(c1.field2, c1.field3),
      ...
  }
}

dataframe.as[C1].map(Util.convert_C1_to_C2)
    .mode(SaveMode.Overwrite)
    .parquet("s3a://s3Path")

Есть ли более оптимальный способ написать это? Или может кто-нибудь указать на какие-либо вопиющие ошибки в том, как я это сделал? Глядя на мой код, я не понимаю, почему выполнение задачи занимает так чертовски много времени.

Я уже пытался объединиться, чтобы сказать, 16 разделов, чтобы уменьшить количество файлов в s3, но это, похоже, делает работу намного медленнее. Обычно было бы 64 раздела без объединения.


person pigate    schedule 24.03.2017    source источник


Ответы (1)


Вероятно, вы только что столкнулись с проблемой медленного подделки s3-переименования, охватываемой в другом месте. Некоторые исправления обсуждаются там.

person stevel    schedule 24.03.2017