Как перенести фрейм данных в Spark?

У меня есть следующий кадр данных:

+--------------------+---+---+-----+----+--------+----+
|                  ak| 1 | 2 |  3  | 4  |   5    |  6 |
+--------------------+---+---+-----+----+--------+----+
|8dce120638dbdf438   |  2|  1|    0|   0|       0|   0|
|3fd28484316249e95   |  1|  0|    3|   1|       4|   5|
|3636b43f64db33889   |  9|  3|    3|   4|      18|  11|
+--------------------+---+---+-----+----+--------+----+

и я хочу перенести его на следующее:

ak                 depth    user_count
8dce120638dbdf438    1       2
8dce120638dbdf438    2       1
8dce120638dbdf438    3       0
8dce120638dbdf438    4       0
8dce120638dbdf438    5       0
8dce120638dbdf438    6       0
3fd28484316249e95    1       1
3fd28484316249e95    2       0
3fd28484316249e95    3       3
3fd28484316249e95    4       1
3fd28484316249e95    5       4
3fd28484316249e95    6       5
3fd28484316249e95    1       9
3fd28484316249e95    2       3
3fd28484316249e95    3       3
3fd28484316249e95    4       4
3fd28484316249e95    5       18
3fd28484316249e95    6       11

Как это сделать на Скале?


person chenyf    schedule 03.03.2018    source источник


Ответы (2)


Аналогичный подход к @Ramesh Maharjan, но без использования UDF - вместо этого используются встроенные функции Spark array и struct для создания аналогичного массива, который можно взорвать:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.types._

// per column name, create a struct (similar to a tuple) of the column name and value:
def arrayItem(name: String) = struct(lit(name) cast IntegerType as "depth", $"$name" as "user_count")

// create an array of these per column, explode it and select the relevant columns:
df.withColumn("tmp", explode(array(df.columns.tail.map(arrayItem): _*)))
  .select($"ak", $"tmp.depth", $"tmp.user_count")
person Tzach Zohar    schedule 03.03.2018

Решение кажется простым: собрать значения с именами столбцов в виде массива, затем использовать функцию разнесения, чтобы разделить каждый элемент массива на отдельные строки, а затем, наконец, разделить ключ и значение на отдельные столбцы.

Обобщение приведенного выше объяснения в код с объяснением приведено ниже.

val columns = df.columns.tail   //selecting columns to be changed to rows

import org.apache.spark.sql.functions._
//defining udf for zipping the column names with value and returning as array of column names zipped with column values
def zipUdf = udf((cols: collection.mutable.WrappedArray[String], vals: collection.mutable.WrappedArray[String]) => cols.zip(vals))

df.select(col("ak"), zipUdf(lit(columns), array(columns.map(col): _*)).as("depth"))   //calling udf function above
    .withColumn("depth", explode(col("depth")))                                       //exploding the array column to be on separate rows
    .select(col("ak"), col("depth._1").as("depth"), col("depth._2").as("user_count")) //selecting columns as required in output
  .show(false)

У вас должен быть следующий вывод

+-----------------+-----+----------+
|ak               |depth|user_count|
+-----------------+-----+----------+
|8dce120638dbdf438|1    |2         |
|8dce120638dbdf438|2    |1         |
|8dce120638dbdf438|3    |0         |
|8dce120638dbdf438|4    |0         |
|8dce120638dbdf438|5    |0         |
|8dce120638dbdf438|6    |0         |
|3fd28484316249e95|1    |1         |
|3fd28484316249e95|2    |0         |
|3fd28484316249e95|3    |3         |
|3fd28484316249e95|4    |1         |
|3fd28484316249e95|5    |4         |
|3fd28484316249e95|6    |5         |
|3636b43f64db33889|1    |9         |
|3636b43f64db33889|2    |3         |
|3636b43f64db33889|3    |3         |
|3636b43f64db33889|4    |4         |
|3636b43f64db33889|5    |18        |
|3636b43f64db33889|6    |11        |
+-----------------+-----+----------+
person Ramesh Maharjan    schedule 03.03.2018