Я пытаюсь преобразовать некоторые входные данные в формат, который я хочу, в искровом фрейме данных. Ввод, который у меня есть, представляет собой последовательность этого класса case с до 10 000 000 классов (или, возможно, также строку Json, прежде чем я преобразую ее в класс case ..):
case class Element(paramName: String, value: Int, time: Int)
В результате я хочу такой фрейм данных:
|Time | ParamA | ParamB | ParamC | Param 10,000 |
|1000 | 432432 | 8768768 | Null....... | 75675678622 |
|2000 | Null.......| Null.........| 734543 | Null................. |
....
Таким образом, не обязательно определять каждый параметр для всех временных интервалов. Пропущенные значения должны быть заполнены Null. И, вероятно, будет 10 000 параметров и около 1000 временных интервалов.
То, как я это делаю сейчас, кажется очень плохим из-за эффективности:
case class Elements(name: String, value: Int, time: Int)
case class GroupedObjects(time: Int, params: (String, Int)*)
//elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
.groupBy(element => element.time)
.map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
(element.name, element.value)).toSeq: _*))
//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
"{\"time\":" + obj.time + obj.params.map(tuple =>
",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)
Проблема, которую я вижу здесь, определенно заключается в изменении обратно на строку только для того, чтобы снова прочитать ее в правильном формате. Я был бы очень рад любой помощи, показывающей мне, как получить класс входных данных в требуемом формате фрейма данных. .