1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
练习 Spark实现二次排序
3. 实验学时:
4. 实验原理:
二次排序就是对于<key,value>类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
Spark 二次排序解决方案:
我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。
5. 实验目的:
掌握Spark RDD实现二次排序的方法。
掌握groupBy、orderBy等算子的使用。
6. 实验内容:
使用Spark RDD实现二次排序。
假设我们有以下输入文件(逗号分割的分别是“年,月,总数”):
1. 2018,5,22 2. 2019,1,24 3. 2018,2,128 4. 2019,3,56 5. 2019,1,3 6. 2019,2,-43 7. 2019,4,5 8. 2019,3,46 9. 2018,2,64 10. 2019,1,4 11. 2019,1,21 12. 2019,2,35 13. 2019,2,0
我们期望的输出结果是:
1. 2018-2 64,128 2. 2018-5 22 3. 2019-1 3,4,21,24 4. 2019-2 -43,0,35 5. 2019-3 46,56 6. 2019-4 5
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
8. 实验步骤:
8.1 启动Spark集群
在终端窗口下,输入以下命令,启动Spark集群:
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh
然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。
8.2 启动zeppelin服务器
在终端窗口下,输入以下命令,启动zeppelin服务器:
1. $ zeppelin-daemon.sh start
然后使用jps命令查看进程,确保zeppelin服务器已经启动。
8.3 创建notebook文档
1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器。
http://localhost:9090
2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页。
3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示。
8.4 二次排序实现
1、加载数据集。在zeppelin中执行如下代码。
1. val inputPath = "file:///data/dataset/data.txt" 2. val inputRDD = sc.textFile(inputPath)
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。
2、实现二次排序。在zeppelin中执行如下代码。
1. // 实现二次排序 2. val sortedRDD = inputRDD 3. .map(line => { 4. val arr = line.split(",") 5. val key = arr(0) + "-" + arr(1) 6. val value = arr(2) 7. (key,value) 8. }) 9. .groupByKey() 10. .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(","))) 11. .sortByKey(true) // true:升序,false:降序
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。
3、结果输出。在zeppelin中执行如下代码。
1. sortedRDD.collect.foreach(t => println(t._1 + "\t" + t._2))
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。
1. 2018-2 64,128 2. 2018-5 22 3. 2019-1 3,4,21,24 4. 2019-2 -43,0,35 5. 2019-3 46,56 6. 2019-4 5
8.5 转换为DataFrame输出
也可以将结果转换为DataFrame输出。在zeppelin中执行如下代码。
1. // 加载数据集 2. val inputPath = "file:///data/dataset/data.txt" 3. val inputRDD = sc.textFile(inputPath) 4. 5. // 实现二次排序 6. val sortedRDD = inputRDD 7. .map(line => { 8. val arr = line.split(",") 9. val key = arr(0) + "-" + arr(1) 10. val value = arr(2) 11. (key,value) 12. }) 13. .groupByKey() 14. .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(","))) 15. .sortByKey(true) // true:升序,false:降序 16. 17. // 转换为DataFrame 18. val sortedDF = sortedRDD.toDF("key", "value") 19. 20. // 显示 21. sortedDF.show
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过练习 Spark实现二次排序,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
二次排序就是对于<key,value>类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
Spark 二次排序解决方案:
我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。