Как взорвать колонны?

После:

val df = Seq((1, Vector(2, 3, 4)), (1, Vector(2, 3, 4))).toDF("Col1", "Col2")

У меня есть этот DataFrame в Apache Spark:

+------+---------+
| Col1 | Col2    |
+------+---------+
|  1   |[2, 3, 4]|
|  1   |[2, 3, 4]|
+------+---------+

Как преобразовать это в:

+------+------+------+------+
| Col1 | Col2 | Col3 | Col4 |
+------+------+------+------+
|  1   |  2   |  3   |  4   |
|  1   |  2   |  3   |  4   |
+------+------+------+------+

person Jorge Machado    schedule 23.05.2016    source источник


Ответы (4)


Решение, которое не преобразуется в RDD и из него:

df.select($"Col1", $"Col2"(0) as "Col2", $"Col2"(1) as "Col3", $"Col2"(2) as "Col3")

Или спорно лучше:

val nElements = 3
df.select(($"Col1" +: Range(0, nElements).map(idx => $"Col2"(idx) as "Col" + (idx + 2)):_*))

Размер столбца массива Spark не фиксирован, например, вы можете:

+----+------------+
|Col1|        Col2|
+----+------------+
|   1|   [2, 3, 4]|
|   1|[2, 3, 4, 5]|
+----+------------+

Таким образом, невозможно получить количество столбцов и создать их. Если вы знаете, что размер всегда один и тот же, вы можете установить nElements следующим образом:

val nElements = df.select("Col2").first.getList(0).size
person sgvd    schedule 23.05.2016
comment
Хороший ответ. Есть ли способ использовать расширенные API-интерфейсы DataSet для достижения вышеизложенного более безопасным способом? Например, как я могу избавиться от $ Col1? - person Mohammad Adnan; 10.10.2016
comment
Что ты конкретно имеешь ввиду? Я думаю, что не избавиться от него, просто не добавляя его в select :) Я думаю, вы не хотите называть его явно? Но я не уверен, как вы видите безопасность типов, обязательно вступающую в игру. Вероятно, стоит создать новый вопрос с минимальным примером того, что вы хотите сделать. - person sgvd; 10.10.2016
comment
конечно, я добавлю еще один вопрос. но я имею в виду, что DataSet of spark обеспечивает хорошую безопасность времени компиляции. Где $col1 будет оцениваться во время выполнения. Поэтому, если col1 отсутствует в наборе данных, я получаю ошибку времени компиляции в операторе select, а не во время выполнения. - person Mohammad Adnan; 11.10.2016

Просто чтобы дать версию Pyspark ответа sgvd. Если столбец массива находится в Col2, то этот оператор select переместит первые nElements каждого массива в Col2 в их собственные столбцы:

from pyspark.sql import functions as F            
df.select([F.col('Col2').getItem(i) for i in range(nElements)])
person Shane Halloran    schedule 03.11.2017
comment
как бы вы добавили Col1 в оператор select? - person user422930; 08.09.2018
comment
@user422930 user422930, спасибо за ваш вопрос, извините, я увидел его только сейчас. Попробуйте сделать что-то вроде: from pyspark.sql import functions as F df.select([F.col('Col1')]+[F.col('Col2').getItem(i) for i in range(nElements)]) Я не проверял это, но дайте мне знать, работает это или нет. - person Shane Halloran; 25.11.2018

Просто добавьте к решению sgvd:

Если размер не всегда одинаков, вы можете установить nElements следующим образом:

val nElements = df.select(size('Col2).as("Col2_count"))
                  .select(max("Col2_count"))
                  .first.getInt(0)
person Yuan Zhao    schedule 01.08.2017

Вы можете использовать карту:

df.map {
    case Row(col1: Int, col2: mutable.WrappedArray[Int]) => (col1, col2(0), col2(1), col2(2))
}.toDF("Col1", "Col2", "Col3", "Col4").show()
person Carlos Vilchez    schedule 23.05.2016
comment
Что делать, если я не знаю, сколько столбцов у меня в массиве? Я хотел бы иметь что-то вроде взорватьToColums - person Jorge Machado; 23.05.2016