1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
DataFrame的高级操作
3. 实验学时:
4. 实验原理:
Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用此额外信息来执行额外的优化。有几种与Spark SQL交互的方法,包括SQL和Dataset API。在计算结果时,使用相同的执行引擎,与您用于表达计算的API /语言无关。这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供表达给定转换的最自然的方式。
5. 实验目的:
掌握操作DataFrame的聚合、连接、合并等操作。
掌握DataFrame的Pivoting操作。
6. 实验内容:
1、学会使用Spark SQL进行聚合操作。
2、学会使用Spark SQL进行连接操作。
3、学会使用Spark SQL进行合并操作。
4、学会使用 Pivoting 函数。
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
8. 实验步骤:
8.1 启动Spark与zeppelin
1、在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh 4. $ zeppelin-daemon.sh start
2、将本实验要用到的数据集上传到HDFS上。在终端窗口下,输入以下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/batch 2. $ hdfs dfs -put /data/dataset/batch/airports.csv /data/dataset/batch
3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,如下图所示:
8.2 对DataFrame进行聚合操作
1、下面的代码读取一个航班飞行记录数据集,并创建 DataFrame。
在zeppelin中执行如下代码:
1. // 读取数据集 2. val filePath = "/data/dataset/batch/airports.csv" 3. val flight_summary = spark.read.format("csv").option("header","true").option("inferSchema","true").load(filePath) 4. 5. // 打印数据集列名 6. flight_summary.printSchema 7. 8. //查看原始数据 9. flight_summary.show(5)
结果:
1. root 2. |-- IATA_CODE: string (nullable = true) 3. |-- AIRPORT: string (nullable = true) 4. |-- CITY: string (nullable = true) 5. |-- STATE: string (nullable = true) 6. |-- COUNTRY: string (nullable = true) 7. |-- LATITUDE: double (nullable = true) 8. |-- LONGITUDE: double (nullable = true) 9. 10. +---------+--------------------+-----------+-----+-------+--------+----------+ 11. |IATA_CODE| AIRPORT| CITY|STATE|COUNTRY|LATITUDE| LONGITUDE| 12. +---------+--------------------+-----------+-----+-------+--------+----------+ 13. | ABE|Lehigh Valley Int...| Allentown| PA| USA|40.65236| -75.4404| 14. | ABI|Abilene Regional ...| Abilene| TX| USA|32.41132| -99.6819| 15. | ABQ|Albuquerque Inter...|Albuquerque| NM| USA|35.04022|-106.60919| 16. | ABR|Aberdeen Regional...| Aberdeen| SD| USA|45.44906| -98.42183| 17. | ABY|Southwest Georgia...| Albany| GA| USA|31.53552| -84.19447| 18. +---------+--------------------+-----------+-----+-------+--------+----------+ 19. only showing top 5 rows
2、计算 flight_summary DataFrame 中不同列的数量。
1. flight_summary.select(count("AIRPORT"),count("COUNTRY").as("dest_count")).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +--------------+--------------+ 2. |count(AIRPORT)|count(COUNTRY)| 3. +--------------+--------------+ 4. | 322| 322| 5. +--------------+--------------+
3、它只计算每个组的唯一项。
1. //统计总共有多少个机场 2. flight_summary.select(countDistinct("AIRPORT"), countDistinct("CITY"), count("*")).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +-----------------------+--------------------+--------+ 2. |count(DISTINCT AIRPORT)|count(DISTINCT CITY)|count(1)| 3. +-----------------------+--------------------+--------+ 4. | 322| 308| 322| 5. +-----------------------+--------------------+--------+
4、近似计算。
approx_count_distinct 函数中实现了该算法的一个版本。因为唯一的计数是一个近似值,所以会有一定数量的误差。这个函数允许为这个用例指定一个可接受估算误差的值。
1. // 统计 flight_summary DataFrame 的"count"列。默认估算误差是 0.05 (5%) 2. flight_summary.select(count("LATITUDE"),countDistinct("LATITUDE"), approx_count_distinct("LATITUDE", 0.05)).show 3. 4. // 感受一下 approx_count_distinct function 与 countDistinct 函数相比有多快 5. // 分别调用这两个函数 6. flight_summary.select(countDistinct("LATITUDE")).show 7. 8. // 指定 1% 估算误差 9. flight_summary.select(approx_count_distinct("LATITUDE", 0.01)).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------------+------------------------+-------------------------------+ 2. |count(LATITUDE)|count(DISTINCT LATITUDE)|approx_count_distinct(LATITUDE)| 3. +---------------+------------------------+-------------------------------+ 4. | 319| 319| 324| 5. +---------------+------------------------+-------------------------------+ 6. 7. +------------------------+ 8. |count(DISTINCT LATITUDE)| 9. +------------------------+ 10. | 319| 11. +------------------------+ 12. 13. +-------------------------------+ 14. |approx_count_distinct(LATITUDE)| 15. +-------------------------------+ 16. | 321| 17. +-------------------------------+
5、统计指定列的最小值和最大值。
1. flight_summary.select(min("LATITUDE"), max("LATITUDE")).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +-------------+-------------+ 2. |min(LATITUDE)|max(LATITUDE)| 3. +-------------+-------------+ 4. | 13.48345| 71.28545| 5. +-------------+-------------+
6、计算一个数字列中的值的总和。
1. flight_summary.select(sum("LATITUDE")).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +-------------+ 2. |sum(LATITUDE)| 3. +-------------+ 4. | 12435.01681| 5. +-------------+
7、这个函数只汇总了一个数字列的不同值。
flight_summary.select(sumDistinct(“LATITUDE”)).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----------------------+ 2. |sum(DISTINCT LATITUDE)| 3. +----------------------+ 4. | 12435.016810000001| 5. +----------------------+
8、计算一个数字列的平均值。
1. flight_summary.select(avg("LATITUDE"), (sum("LATITUDE") / count("LATITUDE"))).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +------------------+---------------------------------+ 2. | avg(LATITUDE)|(sum(LATITUDE) / count(LATITUDE))| 3. +------------------+---------------------------------+ 4. |38.981243918495295| 38.981243918495295| 5. +------------------+---------------------------------+
10、计算 flight_summary DataFrame 中的LATITUDE列的方差和标准偏差。
1. flight_summary.select(variance("LATITUDE"), var_pop("LATITUDE"), stddev("LATITUDE"), stddev_pop("LATITUDE")).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +------------------+-----------------+---------------------+--------------------+ 2. |var_samp(LATITUDE)|var_pop(LATITUDE)|stddev_samp(LATITUDE)|stddev_pop(LATITUDE)| 3. +------------------+-----------------+---------------------+--------------------+ 4. | 74.24813207318232|74.01537930806262| 8.616735581018041| 8.603219124726664| 5. +------------------+-----------------+---------------------+--------------------+
8.3 分组聚合
1、在 DataFrame 中的每个子组中执行聚 合。通过分组执行聚合是一个两步的过程。
1. flight_summary.groupBy("AIRPORT").count().show(5, false)
【shift+enter】对程序进行输出。输出结果如下所示:
1. 2. +-------------------------------------------------+-----+ 3. |AIRPORT |count| 4. +-------------------------------------------------+-----+ 5. |Melbourne International Airport |1 | 6. |Eppley Airfield |1 | 7. |San Diego International Airport (Lindbergh Field)|1 | 8. |Kahului Airport |1 | 9. |Austin-Bergstrom International Airport |1 | 10. +-------------------------------------------------+-----+
2、每组多个聚合。有时需要在同一时间对每个组执行多个聚合。例如,除了计数之外,还想知道最小值和最大值。下面按 AIRPORT列分组之后,执行多个聚合。
1. import org.apache.spark.sql.functions._ 2. 3. flight_summary.groupBy("AIRPORT").agg(count("LATITUDE").as("LATITUDE"), 4. min("LATITUDE"), max("LATITUDE"),sum("LATITUDE") ).show(5)
【shift+enter】对程序进行输出。输出结果如下所示:
1. +--------------------+--------+-------------+-------------+-------------+ 2. | AIRPORT|LATITUDE|min(LATITUDE)|max(LATITUDE)|sum(LATITUDE)| 3. +--------------------+--------+-------------+-------------+-------------+ 4. |Melbourne Interna...| 1| 28.10275| 28.10275| 28.10275| 5. | Eppley Airfield| 1| 41.30252| 41.30252| 41.30252| 6. |San Diego Interna...| 1| 32.73356| 32.73356| 32.73356| 7. | Kahului Airport| 1| 20.89865| 20.89865| 20.89865| 8. |Austin-Bergstrom ...| 1| 30.19453| 30.19453| 30.19453| 9. +--------------------+--------+-------------+-------------+-------------+
8.4 join连接
1、对两个DataFrame执行内连接。
1. import sqlContext.implicits._ 2. case class Employee(first_name:String, dept_no:Long) 3. 4. case class Dept(id:Long, name:String) 5. 6. val employeeDF = Seq( Employee("John", 31), Employee("Jeff", 33), 7. Employee("Mary", 33), Employee("Mandy", 34), Employee("Julie", 34), Employee("Kurt", null.asInstanceOf[Int]) 8. ).toDF 9. 10. val deptDF = Seq( Dept(31, "Sales"), Dept(33, "Engineering"), 11. Dept(34, "Finance"), 12. Dept(35, "Marketing") ).toDF 13. 14. // 将它们注册为 view 视图,然后就可以使用 SQL 来执行 join 连接 15. employeeDF.createOrReplaceTempView("employees") 16. 17. deptDF.createOrReplaceTempView("departments") 18. 19. // 定义相等比较的 join 表达式 20. val joinExpression = employeeDF.col("dept_no") === deptDF.col("id") 21. // 执行该 join 22. employeeDF.join(deptDF, joinExpression, "inner").show 23. // 不需要指定该 join 类型,因为"inner"是默认的 24. // employeeDF.join(deptDF, joinExpression).show 25. 26. // 使用 SQL执行join连接 27. spark.sql("select * from employees JOIN departments on dept_no == id").show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----------+-------+---+-----------+ 2. |first_name|dept_no| id| name| 3. +----------+-------+---+-----------+ 4. | John| 31| 31| Sales| 5. | Jeff| 33| 33|Engineering| 6. | Mary| 33| 33|Engineering| 7. | Mandy| 34| 34| Finance| 8. | Julie| 34| 34| Finance| 9. +----------+-------+---+-----------+ 10. 11. +----------+-------+---+-----------+ 12. |first_name|dept_no| id| name| 13. +----------+-------+---+-----------+ 14. | John| 31| 31| Sales| 15. | Jeff| 33| 33|Engineering| 16. | Mary| 33| 33|Engineering| 17. | Mandy| 34| 34| Finance| 18. | Julie| 34| 34| Finance| 19. +----------+-------+---+-----------+
2、对两个DataFrame执行左外连接。
1. // 连接类型既可以是"left_outer",也可以是"leftouter" 2. employeeDF.join(deptDF, 'dept_no === 'id, "left_outer").show 3. 4. // 使用 SQL执行查询 5. spark.sql("select * from employees LEFT OUTER JOIN departments on dept_no == id").show
【shift+enter】对程序进行输出。输出结果如下所示: ```bash 6. +----------+-------+---+-----------+ 7. |first_name|dept_no| id| name| 8. +----------+-------+---+-----------+ 9. | John| 31| 31| Sales| 10. | Jeff| 33| 33|Engineering| 11. | Mary| 33| 33|Engineering| 12. | Mandy| 34| 34| Finance| 13. | Julie| 34| 34| Finance| 14. +----------+-------+---+-----------+ 15. 16. +----------+-------+---+-----------+ 17. |first_name|dept_no| id| name| 18. +----------+-------+---+-----------+ 19. | John| 31| 31| Sales| 20. | Jeff| 33| 33|Engineering| 21. | Mary| 33| 33|Engineering| 22. | Mandy| 34| 34| Finance| 23. | Julie| 34| 34| Finance| 24. +----------+-------+---+-----------+
3、对两个DataFrame执行右外连接。
25. employeeDF.join(deptDF, 'dept_no === 'id, "right_outer").show 26. 27. // 使用 SQL执行查询 28. spark.sql("select * from employees RIGHT OUTER JOIN departments on dept_no == id").show
【shift+enter】对程序进行输出。输出结果如下所示:
29. +----------+-------+---+-----------+ 30. |first_name|dept_no| id| name| 31. +----------+-------+---+-----------+ 32. | John| 31| 31| Sales| 33. | Mary| 33| 33|Engineering| 34. | Jeff| 33| 33|Engineering| 35. | Julie| 34| 34| Finance| 36. | Mandy| 34| 34| Finance| 37. | null| null| 35| Marketing| 38. +----------+-------+---+-----------+ 39. 40. +----------+-------+---+-----------+ 41. |first_name|dept_no| id| name| 42. +----------+-------+---+-----------+ 43. | John| 31| 31| Sales| 44. | Mary| 33| 33|Engineering| 45. | Jeff| 33| 33|Engineering| 46. | Julie| 34| 34| Finance| 47. | Mandy| 34| 34| Finance| 48. | null| null| 35| Marketing| 49. +----------+-------+---+-----------+
4、对两个DataFrame执行全外连接。这种 join 类型的行为实际上与将左外连接和右外连接的结果结合起来是一样的。
52. employeeDF.join(deptDF, 'dept_no === 'id, "outer").show 53. 54. // 使用 SQL执行查询 55. spark.sql("select * from employees FULL OUTER JOIN departments on dept_no == id").show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----------+-------+----+-----------+ 2. |first_name|dept_no| id| name| 3. +----------+-------+----+-----------+ 4. | Kurt| 0|null| null| 5. | Mandy| 34| 34| Finance| 6. | Julie| 34| 34| Finance| 7. | John| 31| 31| Sales| 8. | Jeff| 33| 33|Engineering| 9. | Mary| 33| 33|Engineering| 10. | null| null| 35| Marketing| 11. +----------+-------+----+-----------+ 12. 13. +----------+-------+----+-----------+ 14. |first_name|dept_no| id| name| 15. +----------+-------+----+-----------+ 16. | Kurt| 0|null| null| 17. | Mandy| 34| 34| Finance| 18. | Julie| 34| 34| Finance| 19. | John| 31| 31| Sales| 20. | Jeff| 33| 33|Engineering| 21. | Mary| 33| 33|Engineering| 22. | null| null| 35| Marketing| 23. +----------+-------+----+-----------+
5、Left Anti-Join。这种 join 类型使您能够发现来自左边数据集的哪些行在右边的数据集上没有任何匹配 的行,而连接后的数据集只包含来自左边数据集的列。
1. employeeDF.join(deptDF, 'dept_no === 'id, "left_anti").show 2. 3. // 使用 SQL执行查询 4. spark.sql("select * from employees LEFT ANTI JOIN departments on dept_no == id").show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----------+-------+ 2. |first_name|dept_no| 3. +----------+-------+ 4. | Kurt| 0| 5. +----------+-------+ 6. 7. +----------+-------+ 8. |first_name|dept_no| 9. +----------+-------+ 10. | Kurt| 0| 11. +----------+-------+
6、Left Semi-Join。这种 join 类型的行为类似于内连接类型,除了连接后的数据集不包括来自右边数据集 的列。另一种考虑这种 join 类型的方法是它的行为与 left anti-join 相反,在这里,连接后的 数据集只包含匹配的行。
1. employeeDF.join(deptDF, 'dept_no === 'id, "left_semi").show 2. 3. // 使用 SQL执行查询 4. spark.sql("select * from employees LEFT SEMI JOIN departments on dept_no == id").show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----------+-------+ 2. |first_name|dept_no| 3. +----------+-------+ 4. | John| 31| 5. | Jeff| 33| 6. | Mary| 33| 7. | Mandy| 34| 8. | Julie| 34| 9. +----------+-------+ 10. 11. +----------+-------+ 12. |first_name|dept_no| 13. +----------+-------+ 14. | John| 31| 15. | Jeff| 33| 16. | Mary| 33| 17. | Mandy| 34| 18. | Julie| 34| 19. +----------+-------+
7、交叉连接 (又称为 Cartesian-笛卡尔连接)。
1. // 使用 crossJoin transformation 并显示该 count 2. employeeDF.crossJoin(deptDF).count // Long = 24 3. 4. // 使用 SQL,并显示前 30 行以观察连接后的数据集中所有的行 5. spark.sql("select * from employees CROSS JOIN departments").show(30)
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----------+-------+---+-----------+ 2. |first_name|dept_no| id| name| 3. +----------+-------+---+-----------+ 4. | John| 31| 31| Sales| 5. | John| 31| 33|Engineering| 6. | John| 31| 34| Finance| 7. | John| 31| 35| Marketing| 8. | Jeff| 33| 31| Sales| 9. | Jeff| 33| 33|Engineering| 10. | Jeff| 33| 34| Finance| 11. | Jeff| 33| 35| Marketing| 12. | Mary| 33| 31| Sales| 13. | Mary| 33| 33|Engineering| 14. | Mary| 33| 34| Finance| 15. | Mary| 33| 35| Marketing| 16. | Mandy| 34| 31| Sales| 17. | Mandy| 34| 33|Engineering| 18. | Mandy| 34| 34| Finance| 19. | Mandy| 34| 35| Marketing| 20. | Julie| 34| 31| Sales| 21. | Julie| 34| 33|Engineering| 22. | Julie| 34| 34| Finance| 23. | Julie| 34| 35| Marketing| 24. | Kurt| 0| 31| Sales| 25. | Kurt| 0| 33|Engineering| 26. | Kurt| 0| 34| Finance| 27. | Kurt| 0| 35| Marketing| 28. +----------+-------+---+-----------+
8.5 Pivoting 函数
1、Pivoting 函数。在下面的例子中,在一个小数据集上,包含学生信息,每行包含学生姓名、性别、 体重、毕业年份。现在想要知道每个毕业年份每个性别的平均体重:
1. import sqlContext.implicits._ 2. 3. case class Student(name:String, gender:String, weight:Int, graduation_year:Int) 4. 5. // 转为 DataFrame 6. val studentsDF = Seq(Student("John", "M", 180, 2015), 7. Student("Mary", "F", 110, 2015), Student("Derek", "M", 200, 2015), Student("Julie", "F", 109, 2015), Student("Allison", "F", 105, 2015), Student("kirby", "F", 115, 2016), Student("Jeff", "M", 195, 2016)).toDF 8. 9. // 计算每年每个性别的平均体重 10. studentsDF.groupBy("graduation_year").pivot("gender").avg("weight").show()
【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------------+-----+-----+ 2. |graduation_year| F| M| 3. +---------------+-----+-----+ 4. | 2015|108.0|190.0| 5. | 2016|115.0|195.0| 6. +---------------+-----+-----+
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过学习DataFrame的高级操作,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
经过本节实验的学习,通过学习DataFrame的聚合、连接、合并以及DataFrame的Pivoting操作,我们可以使用Spark SQL更好的去分析数据。