请看下面的例子(根据相同键,计算其所有值的平均值):
val cbRDD = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 5)))
val result = cbRDD.combineByKey( // 分区内遇到新的键时,创建一个(累加值,出现次数)的键值对 (v) => (v, 1), // 分区内遇到已经创建过的相应累加器的旧键时,更新对应累加器 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), // 多个分区遇到同一个键的累加器,更新主累加器 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 求平均值 ).map{ case (key, value) => (key, value._1 / value._2.toFloat) } // 输出结果 result.collectAsMap().foreach(println(_))
遍历到第一个元素: ("a", 1)时,因为此元素肯定没出现过,所以调用的是第一个参数:(v) => (v, 1),创建一个(累加值,出现次数)的键值对,此处的累加值为1,因为要计算平均值,所以此键值对为:(1,1)。
遍历到第二个元素:("a", 2)时,发现我们的 key 已经在遍历第一个元素时出现过了,所以,需要调用第二个参数:(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),acc 表示已经存在的(累加值,出现次数)键值对,acc._1 表示键值对的键(即为1),acc._2 表示键值对的值(即为1)。第二个元素的累加值为2,所以,此处(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1)得到的结果应该是:(1+2,1+1),即(3,2)。
遍历到第三个元素:("a", 3)时,发现 key 也已经存在了,一样是调用第二个参数。结果为:(3+3,2+1),即(6,3)。
以此类似,扫描完所有元素,扫描完后需要对多个分区的数据进行合并,调用第三个参数:
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),acc1、acc2表示具有相同键的键值对,比如"a"在一个分区的结果为:(6,3),一个分区为:(2,3),则进行相加,得到结果:(6+2,3+3),即(8,6)才是最终的结果(此比喻与例子无关)。
最后进行 map 算子操作,将 key 映射回来:.map{ case (key, value) => (key, value._1 / value._2.toFloat) },此处的value指的是我们前面所统计的键值对结果,比如(8,6),8为累加之和,6为一共有多少个数。
最后打印结果,如图:
【9】subtractByKey()
含义:删掉 rdd1中的键
与 rdd2的键
相同的元素。
val rdd1= sc.parallelize(List((1,2),(2,3),(2,4))) val rdd2= sc.parallelize(List((2,4),(3,5))) rdd1.subtractByKey(rdd2).collect()
代码解释:rdd1中有键为2
的元素,rdd2中也有,所以,删除rdd1中的两个元素:(2,3),(2,4)
,最后剩下一个元素(1,2)
。
【10】cogroup()
含义:将两个 RDD 中拥有相同键
的数据分组到一起。
rdd1.cogroup(rdd2).collect().foreach(println(_))
代码解释:value 中第一个 CompactBuffer 为 rdd1 的值,第二个 CompactBuffer 为 rdd2 的值。
3. 行动算子
【1】countByKey()
含义:对每个键对应的元素分别计数。
val rdd = sc.parallelize(List((1,2),(2,3),(2,4),(3,5))) rdd.countByKey().foreach(println(_))
代码解释:键为1
出现了1次,2
出现了2次,3
出现了1次。
【2】lookup()
含义:返回给定键对应的所有值。
rdd.lookup(2)
【3】collectAsMap()
含义:将结果以映射表的形式返回,key 如果重复,后边的元素会覆盖前面的元素。与 collect 类似,但适用于键值 RDD 并且会保留其键值结构。
rdd.collectAsMap()
0x02 RDD的缓存与持久化
1. 缓存与持久化的意义
在大数据处理场景中,我们的数据量会达到TB、甚至PB级别,并且会重复调用同一组数据,如果每一次调用都要重新计算,将会非常消耗资源,所以我们可以对处理过程中的中间数据进行数据缓存,或者持久化到内存或者磁盘中。
2. 缓存
我们可以对 RDD 使用 cache()方法进行缓存,即在集群相关节点的内存中进行缓存。
首先,我们需要引入相关的模块:
注意:务必先启动HDFS再执行下面的代码!
put.txt
文件为:
shao nai yi nai nai yi yi shao nai nai
val textFileRDD = sc.textFile("hdfs://master:9999/files/put.txt") val wordRDD = textFileRDD.flatMap(line => line.split(" ")) val pairWordRDD = wordRDD.map(word => (word, 1)) val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b) wordCountRDD.cache() // 这里还没有执行缓存 wordCountRDD.collect().foreach(println) // 遇到行动算子操作才真正开始计算RDD并缓存