RDD上的高级操作

简介: RDD上的高级操作

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

RDD上的高级操作

3. 实验学时:

4. 实验原理:

Broadcast variables(广播变量)允许将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。广播变量可以用一种高效的方式给每个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以降低通信成本。

 Accumulators(累加器)是一个仅可以执行 added(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且可以添加新的支持类型。

 数据分区是 Spark 在集群中的多个节点之间划分数据的机制。RDD 的每个部分(piece 或 slice)称为分区。例如,当从本地文件系统加载一个文本文件到 Spark 时,文件的内容被分成几个分区,这些分区均匀地分布在集群中的节点上。不止一个分区可能会出现在同一 个节点上。所有这些分区的总和形成您的 RDD。这就是”弹性分布数据集”中”分布”这一词语 的来源。


每个RDD维护一个分区的列表和一个可选的首选位置列表,用于计算分区。可以从RDD的 partitions 字段获得RDD分区的列表。它是一个数组Array,所以可以通过读取它 的 partitions.size 字段来获得RDD分区的数量。

 RDD分区的数量很重要,因为除了影响整个集群中的数据分布之外,还直接决定了将要运行RDD转换的任务的数量。如果这个数字太小,集群将得不到充分利用。此外,可能会导致内存问题,因为工作集可能会变得太大,无法装入 executor 的内存中。建议使用集群中的核的三到四倍的分区。


5. 实验目的:

掌握两种共享变量的应用。

 掌握数据分区与重分区操作。


6. 实验内容:

1、使用Accumulator累加器。

 2、使用Broadcast广播变量。

 3、使用unpersist()更新广播变量。

 4、destroy()释放广播变量。

 5、对数据进行分区和重分区。

 6、自定义分区函数。


7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


8. 实验步骤:

8.1 启动Spark集群

在终端窗口下,输入以下命令,启动Spark集群:


1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh

然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。


8.2 启动zeppelin服务器

在终端窗口下,输入以下命令,启动zeppelin服务器:

1.  $ zeppelin-daemon.sh start

然后使用jps命令查看进程,确保zeppelin服务器已经启动。


8.3 创建notebook文档

1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器:

http://localhost:9090

 2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页:


8c5625a6868d4a6594f79087360bd1dd.png

3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示:

2b1dc8d0ebe3491c94fa12a65d244fe5.png


8.4 共享变量

1、Accumulator累加器,顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量。在zeppelin中执行如下代码:


1.  // 创建Accumulator累加器,值为0
2.  var acc_data = sc.accumulator(0)
3.       
4.  // 查看acc_data的数据类型,acc_data.value查看值
5.  println(acc_data.getClass.getSimpleName)
6.  println(acc_data.value)

同时按下【shift+enter】对程序进行输出。输出内容如下所示:

Accumulator
0

2、Accumulator累加器,add,在累加器变量上调用该方法,对累加器进行累加,这个操作只有在excutor上才能执行,各个excetor累加器的值会传递会driver节点进行总的累加,得到最终的累加值,在driver节点通过累加器上value方法获取累加器的值。在zeppelin中执行如下代码:

1.  // 创建Accumulator累加器,值为0
2.  var acc_data = sc.accumulator(0)
3.       
4.  // 生成9个数
5.  val rdd = sc.parallelize(1 to 9)
6.       
7.  val rdd2 = rdd.map(t=> {
8.        acc_data += t
9.        acc_data
10.     })
11.      
12. // 打印rdd2
13. rdd2.collect().foreach(println(_))
14.      
15. // 查看累加器的值
16. print(acc_data.value)

同时按下【shift+enter】对程序进行输出。输出内容如下所示:


1
2
3
4
5
6
7
8
9
45

3、Broadcast广播变量,可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。通常建议共享变量大于5M就是用广播机制,因为这会大大减少网路IO传输的数据量。在广播上调用value属性,就可以获取广播变量的值。在zeppelin中执行如下代码:

1.  // 列表数据
2.  val data = List("dog","cat","dog","cat","cat")
3.  // 创建RDD
4.  val rdd = sc.parallelize(data)
5.       
6.  // 创建字典
7.  val mapper = Map("dog"->1,"cat"->2)
8.  // 声明广播变项
9.  val broadcatVar = sc.broadcast(mapper)
10.      
11. // 通过value获取广播变量的值
12. print(broadcatVar.value)
13.      
14. // 通过get函数获取key对应的value值
15. print(broadcatVar.value.get("dog"))
16.      
17. // 在rdd中使用广播变量
18. rdd.map(t=>{broadcatVar.value.get(t)}).collect()

同时按下【shift+enter】对程序进行输出。输出内容如下所示:


Map(dog -> 1, cat -> 2)
Some(1)
Array(Some(1), Some(2), Some(1), Some(2), Some(2))

4、更新广播变量,使用unpersist()。由于广播变量是只读的,即广播出去的变量没法再修改,利用unpersist函数将老的广播变量删除,然后重新广播一遍新的广播变量。在zeppelin中执行如下代码:

1.  // 创建字典
2.  var mapper = Map("dog"->1,"cat"->2)
3.  // 声明广播变项
4.  var broadcatVar = sc.broadcast(mapper)
5.       
6.  // 更新广播变量
7.  broadcatVar.unpersist()
8.       
9.  // 修改值
10. val mapx = mapper + ("pig"->1);
11. // 重新广播
12. broadcatVar = sc.broadcast(mapx)
13.      
14. // 获取广播变量的值
15. broadcatVar.value

同时按下【shift+enter】对程序进行输出。输出内容如下所示:

Map(dog -> 1, cat -> 2, pig -> 1)

5、destroy()方法用于释放广播变量。该方法调用之后,广播变量就不能被使用了,因为存储广播变量的文件已经删除。在zeppelin中执行如下代码:

1.  // 列表数据
2.  val data = List("dog","cat","dog","cat","cat")
3.  // 创建RDD
4.  val rdd = sc.parallelize(data)
5.       
6.  // 创建字典
7.  var mapper = Map("dog"->1,"cat"->2)
8.  // 声明广播变量
9.  val broadcatVar = sc.broadcast(mapper)
10.      
11. // 广播变量的值
12. println(broadcatVar.value)
13.      
14. // 销毁广播变量
15. broadcatVar.destroy()
16. // 销毁广播变量后的值
17. println(broadcatVar.value)
18.      
19. // 不能通过函数来使用
20. rdd.map(t=>broadcatVar.value.get(t)).collect()

同时按下【shift+enter】对程序进行输出。会报文件找不到的问题。输出内容如下所示:

Map(dog -> 1, cat -> 2)
org.apache.spark.SparkException: Attempted to use Broadcast(364) after it was destroyed (destroy at <console>:43)

8.5 数据分区与重分区

1、创建RDD,并指定RDD的分区数。在zeppelin中执行如下代码:

1.  // 创建RDD
2.  val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),4)
3.       
4.  // 获取RDD分区数
5.  println(rdd1.getNumPartitions)

同时按下【shift+enter】获取RDD的数据,同时得到分区数为4。输出内容如下所示:

4

2、对RDD进行转换并重分区。在zeppelin中执行如下代码:

1.  // 转换得到新的RDD
2.  val rdd2 = rdd1.map(math.pow(_,2))
3.       
4.  // 重新分区
5.  val rdd3=rdd2.repartition(3)
6.  // 获取RDD分区数
7.  println(rdd3.getNumPartitions)

同时按下【shift+enter】,对RDD进行map操作后,对RDD进行重分区,分区结果为3。输出内容如下所示:


3

3、我们也可以通过自定义分区类,来定制分区规则。自定义的分区类必须继承自Partitioner类。下面使用自定义的分区类对数据进行处理并将输出文件保存至HDFS。在zeppelin中执行如下代码:

1.  import org.apache.spark.Partitioner
2.  // 自定义分区
3.  class  MySparkPartition(numParts: Int) extends Partitioner {
4.       
5.    // 自已指定的分区数量
6.    override def numPartitions: Int = numParts
7.       
8.    /**
9.      * 可以自定义分区算法
10.     * @param key
11.     * @return
12.     */
13.   override def getPartition(key: Any): Int = {    
14.     val code = (key.hashCode % numPartitions)
15.      
16.     if (code < 0) {
17.       code + numPartitions
18.     } else {
19.       code
20.     }
21.   }
22.      
23.   //自定义equals方法
24.   override def equals(other: Any): Boolean = other match {
25.     case mypartition: MySparkPartition => mypartition.numPartitions == numPartitions
26.     case _ =>   false
27.   }
28.      
29.   //自定义hashcode方法
30.   override def hashCode: Int = numPartitions
31. }
32.      
33. // 模拟1个分区的数据
34. val data = sc.parallelize(1 to 20,1)
35.      
36. // 根据尾号转变为2个分区,分写到2个文件
37. val result= data.map(t=>(t,1)).partitionBy(new MySparkPartition(2))
38. result.saveAsTextFile("hdfs://localhost:9000/data/dataset/rdd/partition-output1")

同时按下【shift+enter】对自定义分区函数进行输出。

 4、在终端通过命令查看hdfs上面的文件,会发现自动生成了2个分区。

hdfs dfs -ls /data/dataset/rdd/partition-output1

9. 实验结果及分析:


9e463f98dcf648239a96d21b9748705b.png

实验结果运行准确,无误


10. 实验结论:

经过本节实验的学习,通过学习RDD上的持久化操作,进一步巩固了我们的Spark基础。


11. 总结及心得体会:

在Spark中,RDD采用惰性求值的机制,每次遇到action操作,Spark都会从头重新计算RDD及其所有的依赖。

这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。可以通过持久化(缓存)机制避免这种重复计算的开销。


12、 实验知识测试

1、重分区使用的RDD方法正确的是(){单选}

 A、repartition()

 B、getNumPartitions()

 C、broadcast()

 D、partition()


13、实验拓展

给定一个文本文件,分别将文本数据设置5个分区以及10个分区


809628d16f78493697ebfedf8f5a0b3b.png

相关文章
|
6月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
347 1
|
分布式计算 Spark 流计算
191 DStream概述
191 DStream概述
72 0
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
43 4
|
1月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
49 0
|
1月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
51 0
|
6月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
6月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
111 0
|
6月前
|
存储 分布式计算 Hadoop
Spark 【RDD编程(一)RDD编程基础】
Spark 【RDD编程(一)RDD编程基础】
|
6月前
|
缓存 分布式计算 Java
Spark【RDD编程(二)RDD编程基础】
Spark【RDD编程(二)RDD编程基础】