1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
RDD上的高级操作
3. 实验学时:
4. 实验原理:
Broadcast variables(广播变量)允许将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。广播变量可以用一种高效的方式给每个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以降低通信成本。
Accumulators(累加器)是一个仅可以执行 added(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且可以添加新的支持类型。
数据分区是 Spark 在集群中的多个节点之间划分数据的机制。RDD 的每个部分(piece 或 slice)称为分区。例如,当从本地文件系统加载一个文本文件到 Spark 时,文件的内容被分成几个分区,这些分区均匀地分布在集群中的节点上。不止一个分区可能会出现在同一 个节点上。所有这些分区的总和形成您的 RDD。这就是”弹性分布数据集”中”分布”这一词语 的来源。
每个RDD维护一个分区的列表和一个可选的首选位置列表,用于计算分区。可以从RDD的 partitions 字段获得RDD分区的列表。它是一个数组Array,所以可以通过读取它 的 partitions.size 字段来获得RDD分区的数量。
RDD分区的数量很重要,因为除了影响整个集群中的数据分布之外,还直接决定了将要运行RDD转换的任务的数量。如果这个数字太小,集群将得不到充分利用。此外,可能会导致内存问题,因为工作集可能会变得太大,无法装入 executor 的内存中。建议使用集群中的核的三到四倍的分区。
5. 实验目的:
掌握两种共享变量的应用。
掌握数据分区与重分区操作。
6. 实验内容:
1、使用Accumulator累加器。
2、使用Broadcast广播变量。
3、使用unpersist()更新广播变量。
4、destroy()释放广播变量。
5、对数据进行分区和重分区。
6、自定义分区函数。
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
8. 实验步骤:
8.1 启动Spark集群
在终端窗口下,输入以下命令,启动Spark集群:
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh
然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。
8.2 启动zeppelin服务器
在终端窗口下,输入以下命令,启动zeppelin服务器:
1. $ zeppelin-daemon.sh start
然后使用jps命令查看进程,确保zeppelin服务器已经启动。
8.3 创建notebook文档
1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器:
http://localhost:9090
2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页:
3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示:
8.4 共享变量
1、Accumulator累加器,顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量。在zeppelin中执行如下代码:
1. // 创建Accumulator累加器,值为0 2. var acc_data = sc.accumulator(0) 3. 4. // 查看acc_data的数据类型,acc_data.value查看值 5. println(acc_data.getClass.getSimpleName) 6. println(acc_data.value)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
Accumulator 0
2、Accumulator累加器,add,在累加器变量上调用该方法,对累加器进行累加,这个操作只有在excutor上才能执行,各个excetor累加器的值会传递会driver节点进行总的累加,得到最终的累加值,在driver节点通过累加器上value方法获取累加器的值。在zeppelin中执行如下代码:
1. // 创建Accumulator累加器,值为0 2. var acc_data = sc.accumulator(0) 3. 4. // 生成9个数 5. val rdd = sc.parallelize(1 to 9) 6. 7. val rdd2 = rdd.map(t=> { 8. acc_data += t 9. acc_data 10. }) 11. 12. // 打印rdd2 13. rdd2.collect().foreach(println(_)) 14. 15. // 查看累加器的值 16. print(acc_data.value)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
1 2 3 4 5 6 7 8 9 45
3、Broadcast广播变量,可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。通常建议共享变量大于5M就是用广播机制,因为这会大大减少网路IO传输的数据量。在广播上调用value属性,就可以获取广播变量的值。在zeppelin中执行如下代码:
1. // 列表数据 2. val data = List("dog","cat","dog","cat","cat") 3. // 创建RDD 4. val rdd = sc.parallelize(data) 5. 6. // 创建字典 7. val mapper = Map("dog"->1,"cat"->2) 8. // 声明广播变项 9. val broadcatVar = sc.broadcast(mapper) 10. 11. // 通过value获取广播变量的值 12. print(broadcatVar.value) 13. 14. // 通过get函数获取key对应的value值 15. print(broadcatVar.value.get("dog")) 16. 17. // 在rdd中使用广播变量 18. rdd.map(t=>{broadcatVar.value.get(t)}).collect()
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
Map(dog -> 1, cat -> 2) Some(1) Array(Some(1), Some(2), Some(1), Some(2), Some(2))
4、更新广播变量,使用unpersist()。由于广播变量是只读的,即广播出去的变量没法再修改,利用unpersist函数将老的广播变量删除,然后重新广播一遍新的广播变量。在zeppelin中执行如下代码:
1. // 创建字典 2. var mapper = Map("dog"->1,"cat"->2) 3. // 声明广播变项 4. var broadcatVar = sc.broadcast(mapper) 5. 6. // 更新广播变量 7. broadcatVar.unpersist() 8. 9. // 修改值 10. val mapx = mapper + ("pig"->1); 11. // 重新广播 12. broadcatVar = sc.broadcast(mapx) 13. 14. // 获取广播变量的值 15. broadcatVar.value
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
Map(dog -> 1, cat -> 2, pig -> 1)
5、destroy()方法用于释放广播变量。该方法调用之后,广播变量就不能被使用了,因为存储广播变量的文件已经删除。在zeppelin中执行如下代码:
1. // 列表数据 2. val data = List("dog","cat","dog","cat","cat") 3. // 创建RDD 4. val rdd = sc.parallelize(data) 5. 6. // 创建字典 7. var mapper = Map("dog"->1,"cat"->2) 8. // 声明广播变量 9. val broadcatVar = sc.broadcast(mapper) 10. 11. // 广播变量的值 12. println(broadcatVar.value) 13. 14. // 销毁广播变量 15. broadcatVar.destroy() 16. // 销毁广播变量后的值 17. println(broadcatVar.value) 18. 19. // 不能通过函数来使用 20. rdd.map(t=>broadcatVar.value.get(t)).collect()
同时按下【shift+enter】对程序进行输出。会报文件找不到的问题。输出内容如下所示:
Map(dog -> 1, cat -> 2) org.apache.spark.SparkException: Attempted to use Broadcast(364) after it was destroyed (destroy at <console>:43)
8.5 数据分区与重分区
1、创建RDD,并指定RDD的分区数。在zeppelin中执行如下代码:
1. // 创建RDD 2. val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),4) 3. 4. // 获取RDD分区数 5. println(rdd1.getNumPartitions)
同时按下【shift+enter】获取RDD的数据,同时得到分区数为4。输出内容如下所示:
4
2、对RDD进行转换并重分区。在zeppelin中执行如下代码:
1. // 转换得到新的RDD 2. val rdd2 = rdd1.map(math.pow(_,2)) 3. 4. // 重新分区 5. val rdd3=rdd2.repartition(3) 6. // 获取RDD分区数 7. println(rdd3.getNumPartitions)
同时按下【shift+enter】,对RDD进行map操作后,对RDD进行重分区,分区结果为3。输出内容如下所示:
3
3、我们也可以通过自定义分区类,来定制分区规则。自定义的分区类必须继承自Partitioner类。下面使用自定义的分区类对数据进行处理并将输出文件保存至HDFS。在zeppelin中执行如下代码:
1. import org.apache.spark.Partitioner 2. // 自定义分区 3. class MySparkPartition(numParts: Int) extends Partitioner { 4. 5. // 自已指定的分区数量 6. override def numPartitions: Int = numParts 7. 8. /** 9. * 可以自定义分区算法 10. * @param key 11. * @return 12. */ 13. override def getPartition(key: Any): Int = { 14. val code = (key.hashCode % numPartitions) 15. 16. if (code < 0) { 17. code + numPartitions 18. } else { 19. code 20. } 21. } 22. 23. //自定义equals方法 24. override def equals(other: Any): Boolean = other match { 25. case mypartition: MySparkPartition => mypartition.numPartitions == numPartitions 26. case _ => false 27. } 28. 29. //自定义hashcode方法 30. override def hashCode: Int = numPartitions 31. } 32. 33. // 模拟1个分区的数据 34. val data = sc.parallelize(1 to 20,1) 35. 36. // 根据尾号转变为2个分区,分写到2个文件 37. val result= data.map(t=>(t,1)).partitionBy(new MySparkPartition(2)) 38. result.saveAsTextFile("hdfs://localhost:9000/data/dataset/rdd/partition-output1")
同时按下【shift+enter】对自定义分区函数进行输出。
4、在终端通过命令查看hdfs上面的文件,会发现自动生成了2个分区。
hdfs dfs -ls /data/dataset/rdd/partition-output1
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过学习RDD上的持久化操作,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
在Spark中,RDD采用惰性求值的机制,每次遇到action操作,Spark都会从头重新计算RDD及其所有的依赖。
这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。可以通过持久化(缓存)机制避免这种重复计算的开销。
12、 实验知识测试
1、重分区使用的RDD方法正确的是(){单选}
A、repartition()
B、getNumPartitions()
C、broadcast()
D、partition()
13、实验拓展
给定一个文本文件,分别将文本数据设置5个分区以及10个分区