Перевести несколько записей одного типа в одну строку

У меня есть входной фрейм данных, в котором я хочу выполнить перевод записей аналогичного типа в одну запись. Например, входной фрейм данных содержит множество записей procdata_*, из которых я хочу только одну запись в выходном фрейме данных, как показано ниже:

Входные данные:

+-------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|       File_name   |Cycle_date|Status|         Source_time|         Target_time|Source_count|Target_count|Missing_Records|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|data_20171223_f.csv|  20180911|  PASS|2018-12-05 10:37:10 |2018-12-05 10:37:12 |           5|           5|              0|
|data_20180421_f.csv|  20180911|  PASS|2018-12-05 10:37:10 |2018-12-05 10:37:12 |           5|           4|              1|
|data_20171007_f.csv|  20180911|  PASS|2018-12-05 10:37:12 |2018-12-05 10:37:12 |           6|           4|              2|
|data_20160423_f.csv|  20180911|  PASS|2018-12-05 10:37:14 |2018-12-05 10:37:15 |           4|           4|              0|
|data_20180106_f.csv|  20180911|  PASS|2018-12-05 10:37:15 |2018-12-05 10:37:15 |          10|           9|              1|
|raw_20180120_f.csv |  20180911|  PASS|2018-12-05 10:37:16 |2018-12-05 10:37:17 |          10|          10|              0|
|raw_20171202_f.csv |  20180911|  PASS|2018-12-05 10:37:17 |2018-12-05 10:37:18 |           2|           2|              0|
|raw_20151219_f.csv |  20180911|  PASS|2018-12-05 10:37:17 |2018-12-05 10:37:18 |          10|          10|              0|
|raw_20151031_f.csv |  20180911|  PASS|2018-12-05 10:37:17 |2018-12-05 10:37:18 |           8|           8|              0|
|raw_20170204_f.csv |  20180911|  PASS|2018-12-05 10:37:18 |2018-12-05 10:37:18 |          12|          10|              2|
|eeight.csv         |  20180911|  FAIL|2018-12-05 10:37:18 |2018-12-05 10:37:19 |          10|          10|             10|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+

Выходные данные:

+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|           File_name   |Cycle_date|Status|         Source_time|         Target_time|Source_count|Target_count|Missing_Records|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|data.csv           |  20180911|  PASS|2018-12-05 10:37:10 |2018-12-05 10:37:15 |          30|          26|              4|
|raw.csv            |  20180911|  PASS|2018-12-05 10:37:16 |2018-12-05 10:37:18 |          42|          40|              2|
|eeight.csv         |  20180911|  FAIL|2018-12-05 10:37:18 |2018-12-05 10:37:19 |          10|          10|              0|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+

Как этого добиться в Spark?


person user9318576    schedule 05.12.2018    source источник


Ответы (2)


Один из способов решить эту проблему - разделить строки в File_name на _ и оставить только первую часть. Затем выполните groupBy и при необходимости объедините столбцы.

Это можно сделать следующим образом, агрегаты можно изменять под конкретные нужды:

df.withColumn("File_name", concat(split($"File_name", "_|\\.").getItem(0), lit(".csv")))
  .groupBy($"File_name")
  .agg(
      first($"Cycle_date") as "Cycle_date",
      first($"Status") as "Status",
      first($"Source_time") as "Source_time", 
      last($"Target_time") as "Target_time",
      sum($"Source_count") as "Source_count",
      sum($"Target_count") as "Target_count",
      sum($"Missing_Records") as "Missing_Records"
    )

Приведенный выше код также разбивается на ., а затем добавляет часть .csv для удобства, когда в столбце File_name нет _.

person Shaido    schedule 05.12.2018

Вы можете преобразовать столбец File_name, используя регулярные выражения, чтобы получить procdata / rawdata, а затем использовать оконные функции Row number, чтобы выбрать только одну строку. Проверь это:

scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = Seq(("procdata_20171223_f.csv","20180911","PASS","2018-12-05 10:37:10","2018-12-05 10:37:12","5","5","0"),("procdata_20180421_f.csv","20180911","PASS","2018-12-05 10:37:10","2018-12-05 10:37:12","5","4","1"),("procdata_20171007_f.csv","20180911","PASS","2018-12-05 10:37:12","2018-12-05 10:37:12","6","4","2"),("procdata_20160423_f.csv","20180911","PASS","2018-12-05 10:37:14","2018-12-05 10:37:15","4","4","0"),("procdata_20180106_f.csv","20180911","PASS","2018-12-05 10:37:15","2018-12-05 10:37:15","10","9","1"),("rawdata_20180120_f.csv","20180911","PASS","2018-12-05 10:37:16","2018-12-05 10:37:17","10","10","0"),("rawdata_20171202_f.csv","20180911","PASS","2018-12-05 10:37:17","2018-12-05 10:37:18","2","2","0"),("rawdata_20151219_f.csv","20180911","PASS","2018-12-05 10:37:17","2018-12-05 10:37:18","10","10","0"),("rawdata_20151031_f.csv","20180911","PASS","2018-12-05 10:37:17","2018-12-05 10:37:18","8","8","0"),("rawdata_20170204_f.csv","20180911","PASS","2018-12-05 10:37:18","2018-12-05 10:37:18","12","10","2"),("itemweight.csv","20180911","FAIL","2018-12-05 10:37:18","2018-12-05 10:37:19","10","10","10")).toDF("File_name","Cycle_date","Status","Source_time","Target_time","Source_count","Target_count","Missing_Records")


// Exiting paste mode, now interpreting.

df: org.apache.spark.sql.DataFrame = [File_name: string, Cycle_date: string ... 6 more fields]

scala> df.withColumn("File_name",regexp_replace('File_name,"""_.*""",".csv")).withColumn("row1",row_number().over(Window.partitionBy('File_name).orderBy('File_name))).filter(" row1=1").drop("row1").show(false)
+--------------+----------+------+-------------------+-------------------+------------+------------+---------------+
|File_name     |Cycle_date|Status|Source_time        |Target_time        |Source_count|Target_count|Missing_Records|
+--------------+----------+------+-------------------+-------------------+------------+------------+---------------+
|rawdata.csv   |20180911  |PASS  |2018-12-05 10:37:17|2018-12-05 10:37:18|10          |10          |0              |
|procdata.csv  |20180911  |PASS  |2018-12-05 10:37:14|2018-12-05 10:37:15|4           |4           |0              |
|itemweight.csv|20180911  |FAIL  |2018-12-05 10:37:18|2018-12-05 10:37:19|10          |10          |10             |
+--------------+----------+------+-------------------+-------------------+------------+------------+---------------+


scala>
person stack0114106    schedule 05.12.2018