Преведете множество записи от подобен тип в един ред

Имам входна рамка с данни, където искам да направя превод с подобен тип записи в един запис. Например входната рамка с данни съдържа много записи от procdata_* записи, където искам само един запис от нея в изходната рамка с данни, както е показано по-долу:

Входна рамка с данни:

+-------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|       File_name   |Cycle_date|Status|         Source_time|         Target_time|Source_count|Target_count|Missing_Records|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|data_20171223_f.csv|  20180911|  PASS|2018-12-05 10:37:10 |2018-12-05 10:37:12 |           5|           5|              0|
|data_20180421_f.csv|  20180911|  PASS|2018-12-05 10:37:10 |2018-12-05 10:37:12 |           5|           4|              1|
|data_20171007_f.csv|  20180911|  PASS|2018-12-05 10:37:12 |2018-12-05 10:37:12 |           6|           4|              2|
|data_20160423_f.csv|  20180911|  PASS|2018-12-05 10:37:14 |2018-12-05 10:37:15 |           4|           4|              0|
|data_20180106_f.csv|  20180911|  PASS|2018-12-05 10:37:15 |2018-12-05 10:37:15 |          10|           9|              1|
|raw_20180120_f.csv |  20180911|  PASS|2018-12-05 10:37:16 |2018-12-05 10:37:17 |          10|          10|              0|
|raw_20171202_f.csv |  20180911|  PASS|2018-12-05 10:37:17 |2018-12-05 10:37:18 |           2|           2|              0|
|raw_20151219_f.csv |  20180911|  PASS|2018-12-05 10:37:17 |2018-12-05 10:37:18 |          10|          10|              0|
|raw_20151031_f.csv |  20180911|  PASS|2018-12-05 10:37:17 |2018-12-05 10:37:18 |           8|           8|              0|
|raw_20170204_f.csv |  20180911|  PASS|2018-12-05 10:37:18 |2018-12-05 10:37:18 |          12|          10|              2|
|eeight.csv         |  20180911|  FAIL|2018-12-05 10:37:18 |2018-12-05 10:37:19 |          10|          10|             10|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+

Изходна рамка от данни:

+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|           File_name   |Cycle_date|Status|         Source_time|         Target_time|Source_count|Target_count|Missing_Records|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+
|data.csv           |  20180911|  PASS|2018-12-05 10:37:10 |2018-12-05 10:37:15 |          30|          26|              4|
|raw.csv            |  20180911|  PASS|2018-12-05 10:37:16 |2018-12-05 10:37:18 |          42|          40|              2|
|eeight.csv         |  20180911|  FAIL|2018-12-05 10:37:18 |2018-12-05 10:37:19 |          10|          10|              0|
+-----------------------+----------+------+--------------------+--------------------+------------+------------+---------------+

Как може да се постигне това в Spark?


person user9318576    schedule 05.12.2018    source източник


Отговори (2)


Един от начините да се реши това би бил да се разделят низовете в File_name на _ и да се запази само първата част. След това изпълнете groupBy и агрегирайте колоните, ако е необходимо.

Може да се направи по следния начин, агрегатите могат да се променят, за да отговарят на специфичните нужди:

df.withColumn("File_name", concat(split($"File_name", "_|\\.").getItem(0), lit(".csv")))
  .groupBy($"File_name")
  .agg(
      first($"Cycle_date") as "Cycle_date",
      first($"Status") as "Status",
      first($"Source_time") as "Source_time", 
      last($"Target_time") as "Target_time",
      sum($"Source_count") as "Source_count",
      sum($"Target_count") as "Target_count",
      sum($"Missing_Records") as "Missing_Records"
    )

Кодът по-горе също се разделя на . и добавя частта .csv след това за удобство, когато няма _ в колоната File_name.

person Shaido    schedule 05.12.2018

Можете да трансформирате колоната File_name с помощта на регулярни изрази, за да получите procdata/rawdata и след това да използвате функциите на прозореца за номер на ред, за да изберете само един ред. Виж това:

scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = Seq(("procdata_20171223_f.csv","20180911","PASS","2018-12-05 10:37:10","2018-12-05 10:37:12","5","5","0"),("procdata_20180421_f.csv","20180911","PASS","2018-12-05 10:37:10","2018-12-05 10:37:12","5","4","1"),("procdata_20171007_f.csv","20180911","PASS","2018-12-05 10:37:12","2018-12-05 10:37:12","6","4","2"),("procdata_20160423_f.csv","20180911","PASS","2018-12-05 10:37:14","2018-12-05 10:37:15","4","4","0"),("procdata_20180106_f.csv","20180911","PASS","2018-12-05 10:37:15","2018-12-05 10:37:15","10","9","1"),("rawdata_20180120_f.csv","20180911","PASS","2018-12-05 10:37:16","2018-12-05 10:37:17","10","10","0"),("rawdata_20171202_f.csv","20180911","PASS","2018-12-05 10:37:17","2018-12-05 10:37:18","2","2","0"),("rawdata_20151219_f.csv","20180911","PASS","2018-12-05 10:37:17","2018-12-05 10:37:18","10","10","0"),("rawdata_20151031_f.csv","20180911","PASS","2018-12-05 10:37:17","2018-12-05 10:37:18","8","8","0"),("rawdata_20170204_f.csv","20180911","PASS","2018-12-05 10:37:18","2018-12-05 10:37:18","12","10","2"),("itemweight.csv","20180911","FAIL","2018-12-05 10:37:18","2018-12-05 10:37:19","10","10","10")).toDF("File_name","Cycle_date","Status","Source_time","Target_time","Source_count","Target_count","Missing_Records")


// Exiting paste mode, now interpreting.

df: org.apache.spark.sql.DataFrame = [File_name: string, Cycle_date: string ... 6 more fields]

scala> df.withColumn("File_name",regexp_replace('File_name,"""_.*""",".csv")).withColumn("row1",row_number().over(Window.partitionBy('File_name).orderBy('File_name))).filter(" row1=1").drop("row1").show(false)
+--------------+----------+------+-------------------+-------------------+------------+------------+---------------+
|File_name     |Cycle_date|Status|Source_time        |Target_time        |Source_count|Target_count|Missing_Records|
+--------------+----------+------+-------------------+-------------------+------------+------------+---------------+
|rawdata.csv   |20180911  |PASS  |2018-12-05 10:37:17|2018-12-05 10:37:18|10          |10          |0              |
|procdata.csv  |20180911  |PASS  |2018-12-05 10:37:14|2018-12-05 10:37:15|4           |4           |0              |
|itemweight.csv|20180911  |FAIL  |2018-12-05 10:37:18|2018-12-05 10:37:19|10          |10          |10             |
+--------------+----------+------+-------------------+-------------------+------------+------------+---------------+


scala>
person stack0114106    schedule 05.12.2018