Получение кадра данных apache spark в правильном формате

Я пытаюсь преобразовать некоторые входные данные в формат, который я хочу, в искровом фрейме данных. Ввод, который у меня есть, представляет собой последовательность этого класса case с до 10 000 000 классов (или, возможно, также строку Json, прежде чем я преобразую ее в класс case ..):

case class Element(paramName: String, value: Int, time: Int)

В результате я хочу такой фрейм данных:

|Time | ParamA | ParamB | ParamC | Param 10,000 |  
|1000 | 432432 | 8768768 | Null....... | 75675678622 |  
|2000 | Null.......| Null.........| 734543 | Null................. |  

....
Таким образом, не обязательно определять каждый параметр для всех временных интервалов. Пропущенные значения должны быть заполнены Null. И, вероятно, будет 10 000 параметров и около 1000 временных интервалов.

То, как я это делаю сейчас, кажется очень плохим из-за эффективности:

case class Elements(name: String, value: Int, time: Int)

case class GroupedObjects(time: Int, params: (String, Int)*)

 //elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
  .groupBy(element => element.time)
  .map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
    (element.name, element.value)).toSeq: _*))

//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
  "{\"time\":" + obj.time + obj.params.map(tuple => 
     ",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)

Проблема, которую я вижу здесь, определенно заключается в изменении обратно на строку только для того, чтобы снова прочитать ее в правильном формате. Я был бы очень рад любой помощи, показывающей мне, как получить класс входных данных в требуемом формате фрейма данных. .


person rincewind    schedule 15.03.2016    source источник


Ответы (2)


Вы можете попытаться построить объекты Row и определить схему RDD вручную, как в следующем примере:

// These extra imports will be required if you don't have them already
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

//elements contains the seq of Element
val elementsRdd = sc.parallelize(elements)

val columnNames = elementsRdd.map(_.name).distinct().collect().sorted

val pivoted = elementsRdd.groupBy(_.time).map {
  case (time, elemsByTime) =>
    val valuesByColumnName = elemsByTime.groupBy(_.name).map {
      case (name, elemsByTimeAndName) => (name, elemsByTimeAndName.map(_.value).sum)
    }
    val allValuesForRow = columnNames.map(valuesByColumnName.getOrElse(_, null))
    (time, allValuesForRow)
}

val schema = StructType(StructField("Time", IntegerType) :: columnNames.map(columnName => StructField(columnName, IntegerType, nullable = true)).toList)
val rowRDD = pivoted.map(p => Row.fromSeq(p._1 :: p._2.toList))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show(10)

Я попробовал это локально с 10 000 000 элементов, например:

val elements = (1 to 10000000).map(i => Element("Param" + (i % 1000).toString, i + 100, i % 10000))

И успешно завершается в разумные сроки.

person Steve Willcock    schedule 16.03.2016

Начиная с Spark 1.6 есть функция pivot. Он работает на DataFrames. Поскольку вы используете классы case, это так же просто, как:

val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val elementsDF = elementsRdd.toDF()

Затем вы можете сделать:

elementsDF.groupBy($"time").pivot(...)

См. документацию по GroupedData чтобы узнать больше о pivot(), но этого должно быть более чем достаточно, чтобы продолжить.

person David Griffin    schedule 15.03.2016
comment
Эй, Дэвид, спасибо за ваш ответ. Я только что попробовал это следующим образом: val dfTransformed = df.groupBy("time").pivot("name").sum("value") Сумма просто для возврата кадра данных. Это работает очень хорошо для большого количества временных меток и нескольких параметров (время от 1 до 1000000 и параметры 2), но для многих параметров и меньшего количества временных меток это не очень хорошо работает. Моим вариантом использования будут 1000 временных меток и 10 000 различных параметров. Например, для 100 временных меток и 10 000 это занимает вечность и приводит к исключению пространства кучи java даже быстрее, чем мой первый подход. Я делаю это неправильно? - person rincewind; 16.03.2016
comment
Например, с моим первым подходом я мог сделать 10 000 параметров со 100 временными метками и получил кучу исключений при установке до 1000 временных меток. При использовании сводного подхода я получаю исключение кучи уже при 10 000 параметров и 100 метках времени. - person rincewind; 16.03.2016
comment
Я не думаю, что вы делаете что-то неправильно. Честно говоря, 1.6 является относительно новым - другой ответ, строящий объекты строки на лету, - это способ, с которым я больше знаком. Вы пробовали так? Поскольку это новая функциональность, вы, возможно, выталкиваете ее за рамки того, для чего она была разработана. Возможно, вы захотите задать этот конкретный вопрос - почему pivot намного медленнее, чем другой подход. При этом я понимаю, что Spark предназначен для длинных и тонких данных, а не для коротких и широких данных. Вероятно, не будет весело иметь 1 000 000 столбцов в Spark. - person David Griffin; 16.03.2016
comment
Что, если сначала подсчитать сумму:elemDF.groupBy($"time", $"name").agg(sum($"value") as "value").groupBy($"time").pivot("name").sum("value") - person David Griffin; 16.03.2016