Научете как да използвате Spark DataFrames

Наборът от данни е съвкупност от данни, които се разпространяват между множество компютри. Наборите от данни имат предимствата както на RDD (силно въвеждане и възможност за използване на мощни ламбда функции), така и на оптимизираната машина за изпълнение на Spark SQL. Наборите от данни могат да бъдат конструирани от JVM обекти и манипулирани чрез функционални трансформации. API за набор от данни е наличен в Scala и Java, но не и в Python. Въпреки това, поради динамичния характер на Python, много от предимствата на API за набор от данни вече са налични в Python. R е подобен на Python в това отношение.

DataFrame е набор от данни, който е организиран в наименувани колони. DataFrames са концептуално еквивалентни на таблици в релационна база данни или рамки с данни в R/Python, но с по-богати оптимизации под капака. DataFrames могат да бъдат конструирани от широк набор от източници, като файлове със структурирани данни, таблици в Hive, външни бази данни или съществуващи RDD. DataFrame API е наличен в Scala, Java, Python и R. В Scala и Java DataFrame е представен от набор от данни от редове.

DataFrames сега са стандартният начин за работа с данни за Scala и Spark в момента. Те ще бъдат нашият основен тип данни.

Предишна глава:



Класът SparkSession е входната точка в Spark. Ще стартираме сесия всеки път, като просто използваме конструктора на класа.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()

Просто започнахме сесия на Spark. Методът getOrCreate на създателя извлича съществуващата сесия, ако съществува, или създава нова. Можете да зададете много параметри, когато създавате нова сесия. Пропуснах ги, за да е просто. Документация на Spark Session Class тук.

Прочети

Сега, след като започнахме сесията, можем да започнем да изследваме DataFrames. Първо, нека създадем DataFrame чрез четене на CSV файл. Точно както в Python, можем да отпечатаме първите редове с метода head.

val df = spark.read.csv("Netflix_2011_2016.csv")
df.head(5)
//////OUT
#column types
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 5 more fields]
//rows
res0: Array[org.apache.spark.sql.Row] = Array([Date,Open,High,Low,Close,Volume,Adj Close], [2011-10-24,119.100002,120.28000300000001,
115.100004,118.839996,120460200,16.977142], [2011-10-25,74.899999,79.390001,74.249997,77.370002,315541800,11.052857000000001], [2011-
10-26,78.73,81.420001,75.399997,79.400002,148733900,11.342857], [2011-10-27,82.179998,82.71999699999999,79.249998,80.86000200000001,7
1190000,11.551428999999999])
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 5 more fields]

Трябва малко да коригираме горния код. Както можете да видите, той третира частта, която всъщност трябва да бъде заглавката, като първи ред и смята, че всички стойности са от тип низ.

Ще добавим две опции към метода read. Първият ще обработва заглавките, а вторият ще коригира типовете.

val df = spark.read.option("header","true").option("inferSchema","true").csv("Netflix_2011_2016.csv")
////////
df: org.apache.spark.sql.DataFrame = [Date: string, Open: double ... 5 more fields]
res1: Array[org.apache.spark.sql.Row] = Array([2011-10-24,119.100002,120.28000300000001,115.100004,118.839996,120460200,16.977142], [
2011-10-25,74.899999,79.390001,74.249997,77.370002,315541800,11.052857000000001], [2011-10-26,78.73,81.420001,75.399997,79.400002,148
733900,11.342857], [2011-10-27,82.179998,82.71999699999999,79.249998,80.86000200000001,71190000,11.551428999999999], [2011-10-28,80.2
80002,84.660002,79.599999,84.14000300000001,57769600,12.02])

Подобно на Python, можем да използваме метода describe за описателна статистика.

df.describe().show()
|summary|      Date|              Open|              High|               Low|             Close|              Volume|         Adj Clo
se|
+-------+----------+------------------+------------------+------------------+------------------+--------------------+----------------
--+
|  count|      1259|              1259|              1259|              1259|              1259|                1259|              12
59|
|   mean|      null|230.39351086656092|233.97320872915006|226.80127876251044|  230.522453845909|2.5634836060365368E7|55.6105400365368
75|
| stddev|      null|164.37456353264244| 165.9705082667129| 162.6506358235739|164.40918905512854| 2.306312683388607E7|35.1866693315254
86|
|    min|2011-10-24|         53.990001|         55.480001|             52.81|              53.8|             3531300|          7.6857
14|
|    max|2016-10-24|        708.900017|        716.159996|        697.569984|        707.610001|           315541800|        130.9299
93|
+-------+----------+------------------+------------------+------------------+------------------+--------------------+----------------
--+

Нека изберем конкретна колона;

df.select("Open").show() #it will display 20 last rows
df.select($"Date",$"Open").show() #for multiple cols

За дасъздадем нова колона, използваме метода withColumn. Първият аргумент е името, а вторият аргумент е формулата за добавяне на нови стойности в него.

val df2 = df.withColumn("OpenMinusClose",df("Open")-df("Close"))
df2.printSchema()
///////////
root
 |-- Date: string (nullable = true)
|-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- OpenMinusClose: double (nullable = true)

Преименуване на колона; (метод as)

df2.select(df2("OpenMinusClose").as("OMC"))

Основни операции

Да започнем сфилтриране; можем да използваме както SQL нотация, така и Scala нотация при филтриране.

За да използваме нотация на Scala, трябва да импортираме неявни

import spark.implicits._
//SCALA notation
df.filter($"Open" > 80).show()
//SQL notation
df.filter("Open > 80").show()

//AND
df.filter($"Open" > 80 && $"Close" < 90).show()
df.filter("Open > 80 AND Close < 90").show()
//EQUAL
df.filter($"Low"===83.7).show()
df.filter("Low =83.7").show()

Можем да използваме метода collect, за да вземем резултатите и да ги запазим в друга променлива и метода count, за да преброим.

var temp = df.filter($"Open" > 80 && $"Close" < 90).collect()
//temp: Array[org.apache.spark.sql.Row] = Array([2011-10-27,82.179998,82.71999699999999,79.249998,80.86000200000001,71190000,11.5514289
99999999], [2011-10-28,80.280002,84.660002,79.599999,84.14000300000001,57769600,12.02], [2011-10-31,83.63999799999999,84.090002,81.45
0002,82.080003,39653600,11.725715], [2011-11-01,80.109998,80.999998,78.74,80.089997,33016200,11.441428], [2011-11-02,80.709998,84.400
002,80.109998,83.389999,41384000,11.912857], [2011-11-09,89.000001,90.440001,87.999998,88.049999,28756000,12.578571], [2011-11-10,89.
290001,90.29999699999999,84.839999,85.11999899999999,39614400,12.16], [2011-11-11,85.899997,87.949997,83.7,87.749999,38140200,12.5357
14], [2011-11-14,87.989998,88.1,85.45,85.719999,21811300,12.245714], [2011-11-15,85.15,87.050003,84.499998...
//count
var temp = df.filter($"Open" > 80 && $"Close" < 90).count()
//temp: Long = 76

Можете дапредадете метод в избрания методи да използвате колони директно в дадения метод. Например, нека проверим корелацията на две колони с помощта на метода corr.

df.select(corr("Open","Close")).show()
+------------------+
| corr(Open, Close)|
+------------------+
|0.9994211167017522|
+------------------+

„За повече вижте документацията.“

Агрегации

Нека използваме други данни за агрегиране.

import org.apache.spark.sql.SparkSession
import spark.implicits._
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.option("header","true").option("inferSchema","true").csv("Sales.csv")
df.printSchema()
df.show()
//
root
|-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: integer (nullable = true)
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|  200|
|   GOOG|Charlie|  120|
|   GOOG|  Frank|  340|
|   MSFT|   Tina|  600|
|   MSFT|    Amy|  124|
|   MSFT|Vanessa|  243|
|     FB|   Carl|  870|
|     FB|  Sarah|  350|
+-------+-------+-----+

Можем да направим групиране, което често се използва в SQL, Python, Excel и т.н.

df.groupBy("Company").mean().show()
+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+
df.groupBy("Company").min().show()
df.groupBy("Company").max().show()
df.groupBy("Company").sum().show()

Някои други агрегатни функции са;

df.select(countDistinct("Sales")).show()
+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                    8|
+---------------------+
df.select(sumDistinct("Sales")).show()
+-------------------+
|sum(DISTINCT Sales)|
+-------------------+
|               2847|
+-------------------+
df.select(variance("Sales")).show()
+-----------------+
|  var_samp(Sales)|
+-----------------+
|67235.55357142855|
+-----------------+
df.select(stddev("Sales")).show()
+------------------+
|stddev_samp(Sales)|
+------------------+
|259.29819430807567|
+------------------+
df.select(collect_set("Sales")).show()
+--------------------+
|  collect_set(Sales)|
+--------------------+
|[350, 340, 870, 1...|
+--------------------+
df.select(sum("Sales")).show()
+----------+
|sum(Sales)|
+----------+
|      2847|
+----------+

Поръчване

df.orderBy("Company").show()
df.orderBy($"Company".desc).show()

Обработка на липсващи стойности

Липсващите данни са незаменими за реални проекти. Много методи за машинно обучение също се провалят, когато се натъкнат на липсващи данни. Следователно трябва по някакъв начин да управляваме липсващите данни. Може да искате да разгледате публикацията в блога ми по тази тема (макар и написана на езика на Python).



Сега нека да разгледаме какви операции можем да правим с DataFrames с липсващи данни в Scala.

val spark = SparkSession.builder().getOrCreate()
val df = spark.read.option("header","true").option("inferSchema","true").csv("datanull.csv")
df.show()
+----+-----+------+
|  Id| Name| Sales|
+----+-----+------+
|emp1|David|  null|
|emp2| null|  null|
|emp3| null| 145.0|
|emp4| Mano|4312.0|
+----+-----+------+

Първата опция, можем да премахнем всички редове, които съдържат нулеви данни.

df.na.drop().show()
+----+----+------+
|  Id|Name| Sales|
+----+----+------+
|emp4|Mano|4312.0|
+----+----+------+
//we can drop any rows that have less data than the minimum number that we passed into method
df.na.drop(2).show()
+----+-----+------+
|  Id| Name| Sales|
+----+-----+------+
|emp1|David|  null|
|emp3| null| 145.0|
|emp4| Mano|4312.0|
+----+-----+------+

Нека попълним липсващите стойности с константа.

df.na.fill(10).show()
+----+-----+------+
|  Id| Name| Sales|
+----+-----+------+
|emp1|David|  10.0|
|emp2| null|  10.0|
|emp3| null| 145.0|
|emp4| Mano|4312.0|
+----+-----+------+
df.na.fill("Kramer").show()
+----+------+------+
|  Id|  Name| Sales|
+----+------+------+
|emp1| David|  null|
|emp2|Kramer|  null|
|emp3|Kramer| 145.0|
|emp4|  Mano|4312.0|
+----+------+------+
//let's fill a specific column
df.na.fill("Costanza", Array("Name")).show()
+----+--------+------+
|  Id|    Name| Sales|
+----+--------+------+
|emp1|   David|  null|
|emp2|Costanza|  null|
|emp3|Costanza| 145.0|
|emp4|    Mano|4312.0|
+----+--------+------+

Методът fill изпълнява операцията по попълване, като взема предвид вида на изпратената в него стойност. Тъй като изпратихме число, то не работи с колоната тип низ.

Това е всичко за момента за DataFrames. Докато напредваме, ще използваме DataFrames повече и ще продължим да учим за тях.

Следваща глава:



Прочетете още…



Препратки

https://spark.apache.org/docs/latest/sql-programming-guide.html