1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
Pair RDD的操作
3. 实验学时:
4. 实验原理:
一些Spark操作只在键值对的RDDs上可用。Spark在包含key/value对的RDDs上提供了专门的transformation API,包括reduceByKey、groupByKey、sortByKey和join等。
Pair RDDs让我们能够在key上并行操作,或者跨网络重新组织数据。Key/value RDDs常被用于执行聚合操作,以及常被用来完成初始的ETL(extract, transform, load)以获取key/value格式数据。
5. 实验目的:
掌握Pair RDD专属Transformation转换方法。
掌握Pair RDD专属Action行动方法。
掌握Pair RDD的JOIN操作。
6. 实验内容:
1、对Pair RDD执行各种常见转换操作:reduceByKey、groupByKey、sortByKey和join等。
2、对Pair RDD执行各种常见action操作:countByKey、collectAsMap、lookup等。
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1,Scala-2.11.11
8. 实验步骤:
8.1 启动Spark集群
在终端窗口下,输入以下命令,启动Spark集群。
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh
然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。
8.2 启动zeppelin服务器
在终端窗口下,输入以下命令,启动zeppelin服务器。
1. $ zeppelin-daemon.sh start
然后使用jps命令查看进程,确保zeppelin服务器已经启动。
8.3 创建notebook文档
1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器。
2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页。
3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示。
8.4 创建Pair RDD
创建Pair RDD的方式有多种。
1、第一种创建方式:从文件中加载。在zeppelin中执行以下代码:
1. // 方式1:从文件中加载,使用map转换为pair rdd 2. val hdfsPath = "file:///data/dataset/resources/people.txt" 3. val rdd = sc.textFile(hdfsPath) 4. 5. // 转换 6. val pairRDD = rdd.map(line => line.split(",")).map(arr => (arr(0),arr(1).trim.toInt)) 7. pairRDD.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res45: Array[(String, Int)] = Array((Michael,29), (Andy,30), (Justin,19))
2、第二种创建方式:通过并行集合创建Pair RDD。在zeppelin中执行以下代码:
1. // 方式2:通过并行集合创建Pair RDD 2. val rdd = sc.parallelize(List("Hadoop","Spark","Hive","Spark")) 3. val pairRDD = rdd.map(word => (word,1)) 4. pairRDD.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res139: Array[(String, Int)] = Array((Hadoop,1), (Spark,1), (Hive,1), (Spark,1))
通过应用keyBy函数来生成对应的key。例如,下面的代码使用每个单词的长度为key,返回构造以后的Pair RDD:
1. val rdd1 = sc.parallelize(List("black","blue","white","green","grey"),2) 2. 3. // 通过应用keyBy函数来创建该RDD中元素的元组,返回一个pair RDD 4. val pairRDD1 = rdd1.keyBy(_.length) 5. pairRDD1.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res141: Array[(Int, String)] = Array((5,black), (4,blue), (5,white), (5,green), (4,grey))
8.5 transformation操作
假设有一个pair rdd {(1,2),(3,4),(3,6)}。首先,让我们构造出一个Pair RDD。在zeppelin中执行以下代码:
1. // 构造pair rdd 2. val pairRDD = sc.parallelize(Seq((1,2),(3,4),(3,6))) 3. pairRDD.collect
接下来,学习Pair RDD上的各种转换操作方法:
1、reduceByKey:按照key来合并值,返回新的Pair RDD。在zeppelin中执行如下代码:
1. val p1 = pairRDD.reduceByKey((x,y) => x + y) 2. p1.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res147: Array[(Int, Int)] = Array((1,2), (3,10))
2、groupByKey:按照key进行分组。在zeppelin中执行如下代码:
1. val p2 = pairRDD.groupByKey 2. p2.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res150: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2)), (3,CompactBuffer(4, 6)))
3、keys:返回所有的key,返回结果是一个数组。在zeppelin中执行如下代码:
1. val p3 = pairRDD.keys 2. p3.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res153: Array[Int] = Array(1, 3, 3)
4、values。返回所有的值,返回结果是一个数组。在zeppelin中执行如下代码:
1. val p4 = pairRDD.values 2. p4.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res156: Array[Int] = Array(2, 4, 6)
5、sortByKey:按key进行排序。在zeppelin中执行如下代码:
1. // val p5 = pairRDD.sortByKey() // 默认是升序 2. val p5 = pairRDD.sortByKey(false) // 降序 3. p5.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res164: Array[(Int, Int)] = Array((3,4), (3,6), (1,2))
6、mapValues:key不变,对值进行map转换。在zeppelin中执行如下代码:
1. val p6 = pairRDD.mapValues(x => x*x) 2. p6.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res167: Array[(Int, Int)] = Array((1,4), (3,16), (3,36))
7、flatMapValues:key不变,对值先进行map转换,再进行flatten转换。在zeppelin中执行如下代码:
1. val p7 = pairRDD.flatMapValues(x => (x to 5)) 2. p7.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res170: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))
8.6 操作多个pair rdd
还可以对两个pair rdd执行集合操作和join连接操作。假设有两个RDD,分别是{(1,2),(3,4),(3,6)}和{(3,9)}。
1、首先构造两个pair rdd。在zeppelin中执行如下代码:
1. // 构造两个pair rdd 2. val pairRDD1 = sc.parallelize(Seq((1,2),(3,4),(3,6))) 3. pairRDD1.collect 4. 5. val pairRDD2 = sc.parallelize(Seq((3,9))) 6. pairRDD2.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res178: Array[(Int, Int)] = Array((1,2), (3,4), (3,6)) res179: Array[(Int, Int)] = Array((3,9))
2、ubstractByKey:按key计算差集。在zeppelin中执行如下代码:
1. val r1 = pairRDD1.subtractByKey(pairRDD2) 2. r1.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res182: Array[(Int, Int)] = Array((1,2))
3、执行join内连接。在zeppelin中执行如下代码:
1. val r2 = pairRDD1.join(pairRDD2) 2. r2.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res185: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))
4、执行join左外连接。在zeppelin中执行如下代码:
1. val r3 = pairRDD1.leftOuterJoin(pairRDD2) 2. r3.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res188: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9))))
5、执行join右外连接。在zeppelin中执行如下代码:
1. val r4 = pairRDD1.rightOuterJoin(pairRDD2) 2. r4.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res191: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),9)), (3,(Some(6),9)))
6、cogroup: 对来自两个RDD的数据按key进行分组。在zeppelin中执行如下代码:
1. val r5 = pairRDD1.cogroup(pairRDD2) 2. r5.collect
同时按下【shift+enter】,执行以上代码,输出内容如下:
res194: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(4, 6),CompactBuffer(9))))
8.7 action操作
所有的传统action操作在pair RDDs上都可以用。除此之外,在pair RDDs上还有一些额外的action可用,以利用数据的key/value性质。
假设有一个pair rdd {(1,2),(3,4),(3,6)}。首先,让我们构造出一个RDD。在zeppelin中执行以下代码:
1. // 构造rdd 2. val pairRDD = sc.parallelize(Seq((1,2),(3,4),(3,6))) 3. pairRDD.collect
接下来,学习Pair RDD上的各种action操作方法:
1、countByKey()操作:countByKey()函数只能在key-value类型的RDDs上可用。它返回一张带有每个key的计数的(K,Int)对的表。在zeppelin中执行如下代码:
1. // countByKey 2. pairRDD.countByKey
同时按下【shift+enter】,执行以上代码,输出内容如下:
res203: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2)
2、collectAsMap():将结果返回为Map。在zeppelin中执行如下代码:
1. // collectAsMap 2. pairRDD.collectAsMap
同时按下【shift+enter】,执行以上代码,输出内容如下:
res206: scala.collection.Map[Int,Int] = Map(1 -> 2, 3 -> 6)
3、lookup()函数:查看指定的元素。在zeppelin中执行如下代码:
1. // lookup(key) 2. pairRDD.lookup(3)
同时按下【shift+enter】,执行以上代码,输出内容如下:
res209: Seq[Int] = WrappedArray(4, 6)
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过学习RDD的创建方式,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
RDD支持两种类型的操作:transformations和actions。其中:transformations是操作RDD并返回一个新的RDD,如map()和filter()方法,而actions是返回一个结果给驱动程序或将结果写入存储的操作,并开始一个计算,如count()和first()。
Spark对于transformations和actions的方式很不一样,所以理解我们所执行的操作是哪一种很重要。transformations RDD是延迟计算的,只在action时才真正进行计算。许多转换是作用于元素范围内的,也就是一次作用于一个元素。
12、 实验知识测试
1、在RDD中通过key查找数据的方法正确的是©{单选}
A、countByKey()
B、keys()
C、lookup()
D、join()
13、实验拓展
1. 假设有以下数据集合。请编写Spark代码,按key求平均值。
(“panda”,0),(“pink”,3),(“pirate”,3),(“panda”,1),(“pink”,4)