Научете как да използвате Spark DataFrames
Наборът от данни е съвкупност от данни, които се разпространяват между множество компютри. Наборите от данни имат предимствата както на RDD (силно въвеждане и възможност за използване на мощни ламбда функции), така и на оптимизираната машина за изпълнение на Spark SQL. Наборите от данни могат да бъдат конструирани от JVM обекти и манипулирани чрез функционални трансформации. API за набор от данни е наличен в Scala и Java, но не и в Python. Въпреки това, поради динамичния характер на Python, много от предимствата на API за набор от данни вече са налични в Python. R е подобен на Python в това отношение.
DataFrame е набор от данни, който е организиран в наименувани колони. DataFrames са концептуално еквивалентни на таблици в релационна база данни или рамки с данни в R/Python, но с по-богати оптимизации под капака. DataFrames могат да бъдат конструирани от широк набор от източници, като файлове със структурирани данни, таблици в Hive, външни бази данни или съществуващи RDD. DataFrame API е наличен в Scala, Java, Python и R. В Scala и Java DataFrame е представен от набор от данни от редове.
DataFrames сега са стандартният начин за работа с данни за Scala и Spark в момента. Те ще бъдат нашият основен тип данни.
Предишна глава:
Класът SparkSession е входната точка в Spark. Ще стартираме сесия всеки път, като просто използваме конструктора на класа.
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate()
Просто започнахме сесия на Spark. Методът getOrCreate на създателя извлича съществуващата сесия, ако съществува, или създава нова. Можете да зададете много параметри, когато създавате нова сесия. Пропуснах ги, за да е просто. Документация на Spark Session Class тук.
Прочети
Сега, след като започнахме сесията, можем да започнем да изследваме DataFrames. Първо, нека създадем DataFrame чрез четене на CSV файл. Точно както в Python, можем да отпечатаме първите редове с метода head.
val df = spark.read.csv("Netflix_2011_2016.csv") df.head(5) //////OUT #column types df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 5 more fields] //rows res0: Array[org.apache.spark.sql.Row] = Array([Date,Open,High,Low,Close,Volume,Adj Close], [2011-10-24,119.100002,120.28000300000001, 115.100004,118.839996,120460200,16.977142], [2011-10-25,74.899999,79.390001,74.249997,77.370002,315541800,11.052857000000001], [2011- 10-26,78.73,81.420001,75.399997,79.400002,148733900,11.342857], [2011-10-27,82.179998,82.71999699999999,79.249998,80.86000200000001,7 1190000,11.551428999999999]) df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 5 more fields]
Трябва малко да коригираме горния код. Както можете да видите, той третира частта, която всъщност трябва да бъде заглавката, като първи ред и смята, че всички стойности са от тип низ.
Ще добавим две опции към метода read. Първият ще обработва заглавките, а вторият ще коригира типовете.
val df = spark.read.option("header","true").option("inferSchema","true").csv("Netflix_2011_2016.csv") //////// df: org.apache.spark.sql.DataFrame = [Date: string, Open: double ... 5 more fields] res1: Array[org.apache.spark.sql.Row] = Array([2011-10-24,119.100002,120.28000300000001,115.100004,118.839996,120460200,16.977142], [ 2011-10-25,74.899999,79.390001,74.249997,77.370002,315541800,11.052857000000001], [2011-10-26,78.73,81.420001,75.399997,79.400002,148 733900,11.342857], [2011-10-27,82.179998,82.71999699999999,79.249998,80.86000200000001,71190000,11.551428999999999], [2011-10-28,80.2 80002,84.660002,79.599999,84.14000300000001,57769600,12.02])
Подобно на Python, можем да използваме метода describe за описателна статистика.
df.describe().show() |summary| Date| Open| High| Low| Close| Volume| Adj Clo se| +-------+----------+------------------+------------------+------------------+------------------+--------------------+---------------- --+ | count| 1259| 1259| 1259| 1259| 1259| 1259| 12 59| | mean| null|230.39351086656092|233.97320872915006|226.80127876251044| 230.522453845909|2.5634836060365368E7|55.6105400365368 75| | stddev| null|164.37456353264244| 165.9705082667129| 162.6506358235739|164.40918905512854| 2.306312683388607E7|35.1866693315254 86| | min|2011-10-24| 53.990001| 55.480001| 52.81| 53.8| 3531300| 7.6857 14| | max|2016-10-24| 708.900017| 716.159996| 697.569984| 707.610001| 315541800| 130.9299 93| +-------+----------+------------------+------------------+------------------+------------------+--------------------+---------------- --+
Нека изберем конкретна колона;
df.select("Open").show() #it will display 20 last rows df.select($"Date",$"Open").show() #for multiple cols
За дасъздадем нова колона, използваме метода withColumn. Първият аргумент е името, а вторият аргумент е формулата за добавяне на нови стойности в него.
val df2 = df.withColumn("OpenMinusClose",df("Open")-df("Close")) df2.printSchema() /////////// root |-- Date: string (nullable = true) |-- Open: double (nullable = true) |-- High: double (nullable = true) |-- Low: double (nullable = true) |-- Close: double (nullable = true) |-- Volume: integer (nullable = true) |-- Adj Close: double (nullable = true) |-- OpenMinusClose: double (nullable = true)
Преименуване на колона; (метод as)
df2.select(df2("OpenMinusClose").as("OMC"))
Основни операции
Да започнем сфилтриране; можем да използваме както SQL нотация, така и Scala нотация при филтриране.
За да използваме нотация на Scala, трябва да импортираме неявни
import spark.implicits._ //SCALA notation df.filter($"Open" > 80).show() //SQL notation df.filter("Open > 80").show()
//AND df.filter($"Open" > 80 && $"Close" < 90).show() df.filter("Open > 80 AND Close < 90").show() //EQUAL df.filter($"Low"===83.7).show() df.filter("Low =83.7").show()
Можем да използваме метода collect, за да вземем резултатите и да ги запазим в друга променлива и метода count, за да преброим.
var temp = df.filter($"Open" > 80 && $"Close" < 90).collect() //temp: Array[org.apache.spark.sql.Row] = Array([2011-10-27,82.179998,82.71999699999999,79.249998,80.86000200000001,71190000,11.5514289 99999999], [2011-10-28,80.280002,84.660002,79.599999,84.14000300000001,57769600,12.02], [2011-10-31,83.63999799999999,84.090002,81.45 0002,82.080003,39653600,11.725715], [2011-11-01,80.109998,80.999998,78.74,80.089997,33016200,11.441428], [2011-11-02,80.709998,84.400 002,80.109998,83.389999,41384000,11.912857], [2011-11-09,89.000001,90.440001,87.999998,88.049999,28756000,12.578571], [2011-11-10,89. 290001,90.29999699999999,84.839999,85.11999899999999,39614400,12.16], [2011-11-11,85.899997,87.949997,83.7,87.749999,38140200,12.5357 14], [2011-11-14,87.989998,88.1,85.45,85.719999,21811300,12.245714], [2011-11-15,85.15,87.050003,84.499998... //count var temp = df.filter($"Open" > 80 && $"Close" < 90).count() //temp: Long = 76
Можете дапредадете метод в избрания методи да използвате колони директно в дадения метод. Например, нека проверим корелацията на две колони с помощта на метода corr.
df.select(corr("Open","Close")).show() +------------------+ | corr(Open, Close)| +------------------+ |0.9994211167017522| +------------------+
„За повече вижте документацията.“
Агрегации
Нека използваме други данни за агрегиране.
import org.apache.spark.sql.SparkSession import spark.implicits._ val spark = SparkSession.builder().getOrCreate() val df = spark.read.option("header","true").option("inferSchema","true").csv("Sales.csv") df.printSchema() df.show() // root |-- Company: string (nullable = true) |-- Person: string (nullable = true) |-- Sales: integer (nullable = true) +-------+-------+-----+ |Company| Person|Sales| +-------+-------+-----+ | GOOG| Sam| 200| | GOOG|Charlie| 120| | GOOG| Frank| 340| | MSFT| Tina| 600| | MSFT| Amy| 124| | MSFT|Vanessa| 243| | FB| Carl| 870| | FB| Sarah| 350| +-------+-------+-----+
Можем да направим групиране, което често се използва в SQL, Python, Excel и т.н.
df.groupBy("Company").mean().show() +-------+-----------------+ |Company| avg(Sales)| +-------+-----------------+ | GOOG| 220.0| | FB| 610.0| | MSFT|322.3333333333333| +-------+-----------------+ df.groupBy("Company").min().show() df.groupBy("Company").max().show() df.groupBy("Company").sum().show()
Някои други агрегатни функции са;
df.select(countDistinct("Sales")).show() +---------------------+ |count(DISTINCT Sales)| +---------------------+ | 8| +---------------------+ df.select(sumDistinct("Sales")).show() +-------------------+ |sum(DISTINCT Sales)| +-------------------+ | 2847| +-------------------+ df.select(variance("Sales")).show() +-----------------+ | var_samp(Sales)| +-----------------+ |67235.55357142855| +-----------------+ df.select(stddev("Sales")).show() +------------------+ |stddev_samp(Sales)| +------------------+ |259.29819430807567| +------------------+ df.select(collect_set("Sales")).show() +--------------------+ | collect_set(Sales)| +--------------------+ |[350, 340, 870, 1...| +--------------------+ df.select(sum("Sales")).show() +----------+ |sum(Sales)| +----------+ | 2847| +----------+
Поръчване
df.orderBy("Company").show() df.orderBy($"Company".desc).show()
Обработка на липсващи стойности
Липсващите данни са незаменими за реални проекти. Много методи за машинно обучение също се провалят, когато се натъкнат на липсващи данни. Следователно трябва по някакъв начин да управляваме липсващите данни. Може да искате да разгледате публикацията в блога ми по тази тема (макар и написана на езика на Python).
Сега нека да разгледаме какви операции можем да правим с DataFrames с липсващи данни в Scala.
val spark = SparkSession.builder().getOrCreate() val df = spark.read.option("header","true").option("inferSchema","true").csv("datanull.csv") df.show() +----+-----+------+ | Id| Name| Sales| +----+-----+------+ |emp1|David| null| |emp2| null| null| |emp3| null| 145.0| |emp4| Mano|4312.0| +----+-----+------+
Първата опция, можем да премахнем всички редове, които съдържат нулеви данни.
df.na.drop().show() +----+----+------+ | Id|Name| Sales| +----+----+------+ |emp4|Mano|4312.0| +----+----+------+ //we can drop any rows that have less data than the minimum number that we passed into method df.na.drop(2).show() +----+-----+------+ | Id| Name| Sales| +----+-----+------+ |emp1|David| null| |emp3| null| 145.0| |emp4| Mano|4312.0| +----+-----+------+
Нека попълним липсващите стойности с константа.
df.na.fill(10).show() +----+-----+------+ | Id| Name| Sales| +----+-----+------+ |emp1|David| 10.0| |emp2| null| 10.0| |emp3| null| 145.0| |emp4| Mano|4312.0| +----+-----+------+ df.na.fill("Kramer").show() +----+------+------+ | Id| Name| Sales| +----+------+------+ |emp1| David| null| |emp2|Kramer| null| |emp3|Kramer| 145.0| |emp4| Mano|4312.0| +----+------+------+ //let's fill a specific column df.na.fill("Costanza", Array("Name")).show() +----+--------+------+ | Id| Name| Sales| +----+--------+------+ |emp1| David| null| |emp2|Costanza| null| |emp3|Costanza| 145.0| |emp4| Mano|4312.0| +----+--------+------+
Методът fill изпълнява операцията по попълване, като взема предвид вида на изпратената в него стойност. Тъй като изпратихме число, то не работи с колоната тип низ.
Това е всичко за момента за DataFrames. Докато напредваме, ще използваме DataFrames повече и ще продължим да учим за тях.
Следваща глава:
Прочетете още…
Препратки
https://spark.apache.org/docs/latest/sql-programming-guide.html