开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段:RDD 算子_转换_ combineByKey】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/11973
RDD 算子_转换_ combineByKey
内容介绍
一、前言
二、介绍 combineByKey
三、为什么要用到 combineByKey
四、总结
一、前言
在之前的课程当中,我们学习了两个算子,即 reduceByKey 和 groupByKey ,他们基于同一个算子来实现算法,这个算子叫做combineByKey , combine 是聚合的意思。
combineByKey 对于 groupByKey 和 reduceByKey 来说会更复杂一些,但它会更偏底层一些。同时它的可控制性会更强,它会有更多的函数交给用户去实现,用户就会更好的控制整个过程。
二、介绍 combineByKey
首先,我们先了解一下 combineByKey 的作用以及 combineByKey 所接收的函数。
combineByKey 的作用是对数据集按照 Key 进行聚合。该聚合既不是 reduce 也不是 group ,它只是一个聚合,由用户来控制。
调用方式如下:
combineByKey 里面接收6个参数,前三个参数是 createCombiner ,将 Value 进行初步的转换,即将原本的 Value 转化成另一个形式; mergeValue ,即在每个分区把上一步转换的结果进行聚合;以及 mergeCombiners ,即在所有分区上把每个分区的聚合结果进行聚合,这三个参数都是需要用户传进的函数。
后三个参数依次是 partitioner ,即分区或分区方式,怎么分区是可以用获取指定;然后是 mapSideCombiner ,是一个布尔值,通过控制它,我们可以控制是否在 map 端进行 combine ;最后的 serializer 是一个序列化器。一般情况下,后三个参数是可以默认不指定的。
如果要理解 combineByKey ,就要理解前三个参数(函数)的意义。
combineByKey 较难,需要多花点时间理解,尤其是前三个参数(函
数)。
三、为什么要用到 combineByKey
1.需求
首先介绍一下需求。现在有这样的一组数据,该数据可以想象为是用户的多门成绩。张三的第一门成绩是99分,第二门成绩是96分,第三门成绩是97。李四的第一门成绩是97,第二门成绩是98。
接下来需要求得平均数,也就是每一个人成绩的平均数。首先我们需要知道平均数是怎么求的,即总分数除以科目数。然后开始我们的第一步,求出总分数,即求和,第二步是求科目数,即频率统计,这是我们的一个需求。
如图所示,最前面一列是初始输入的 RDD ,最后一列是生成结果的
RDD 。最初始的 RDD 通过中间的三个步骤转变成最终的 RDD ,我
们分别来看一下。
首先, createCombiner 作用于每一条数据。注意(“ zhangsan ”,99.0),通过 createCombiner 函数传入后,会转成(“ zhangsan”,(99.0,1)),也就是 partition1 里面转换过的两个数据。mergeValue 把分区里面的数据给进行一次聚合,聚合生成了 (“zhangsan ”,(185.0,2)) ,这是第一个分区的结果。在进行运算时,第一个分区与第二、第三分区是同时进行的运算的,因此会同时得到三个聚合结果。
接下来是 mergeCombiners 操作,作用于所有的分区上,将分区相同的数据聚合到同一个分区里。将分区一和分区三的数据再次聚合,求出了 (“ zhangsan ”,(282.0,3)) ,分区二也是同理。
接下来我们看细节变化,第一个细节在我们第一步操作上。第一步操作是把99.0转成(99.0,1),是由 createCombiner 函数完成。第二步操作是把(99.0,1)转成(185.0,2),这一步是由 mergeValue 完成。第三步把(185.0,2)转变成(282.0,3),这一步是由 mergeCombiners 完成,这就是一整个过程。
2.实例
在了解整个过程之后,开始编写需求的代码。
首先,复制集合,进入代码,创建一个测试方法,test 注解开头。 combineByKey 是一个非常高级的一个聚合方法,同时也是一个非常底层的聚合方法,他的控制性非常的强,但是会比较的复杂,它可以去控制每一步的聚合的,控制分区里的操作,以及控制所有分区结果的操作。第一步是准备集合,第二步是具体的算子操作,第三步是获取结果,
打印结果。在操作部分又分为几个小步骤。第一步,是 createCombiner 转换数据,第二步是 mergeValue 分区上的聚合,第三步是 mergeCombiners 把所有分区上的结果再次聚合,生成最终结果。第一步,准备集合。直接将刚才复制的代码拷过来,接下来是一个小操作,为RDD准备类型。首先它是一个 RDD ,里面存放的是 ( String , Double ) ,它是一个 KeyValue 型的数据。第二步,算子操作。拿到 RDD 直接调用 combineByKey ,它接受三个参数,第一个叫做 createCombiner ,第二个叫 mergeValue ,第三个叫做 mergeCombiners 。第一个函数要用于转换数据,要先接收一个函数 ( arg ) ,然后是 returnValue 。 arg 这块直接是 Double ,因为算子可以直接进行聚合,只需要赋值就行了。这时我们拿到的数据叫做 curr , curr 的类型是 double ,返回结果是 ( curr ,1) 。完成该项操作后,就可以在分区上进行一次初步聚合,但是这一点大家需要注意,mergeValue 和 createCombiner 是一体的,即数据进来先经历 createCombiner ,再经历 mergeValue 。在 mergeValue 当中,它的泛型结果是传入的两个参数,第一个参数是上一步转化后的数据为 ( curr :( Double , Int )) ,第二个参数为 ( agg :( Double , Int )) 。第二个参数传进来的并不是一个局部聚合的结果,而是当前 RDD 的下一条数据,然后这两个参数的值进行聚合。因此 createCombiner 可以理解为只作用于第一条数据。然后将 (agg :( Double , Int )) 修改为 ( nextValue : Double ) ,最终生成 ( curr._1 + nextValue , curr._2 + 1) 。 最后一个函数就是当前两个函数作用于每一个分区,并得到结果后,需要经历 mergeCombiners ,对所有分区的结果进行一次聚合。它将会接收到两个参数,一个是当前值curr ,另一个是局部聚合结果 agg 。第一个参数的类型是 ( Double , Int ) ,第二个参数的类型是 (Double , Int ) ,然后进行聚合,最后得出结果 (curr._1 + agg._1 , curr._2 + agg._2 ),到这一整个流程就完成了。最后运行代码进行数据处理,得出并核对结果。
3.代码
@Test
def combineByKey(): Unit ={
//1.准备集合
val rdd: RDD[(String,Double)] = sc.parallelize((
("zhangsan",99.0),
("zhangsan",96.0),
("lisi",97.0),
("lisi",98.0),
("zhangsan",97.0))
)
//2.算子操作
val combineResult = rdd.combineByKey(
createCombiner = (curr:Double) => (curr,1),
mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),
mergeCombiners = (curr: (Double,Int), agg: (Double,Int)) => (curr._1 + agg._1, curr._2 + agg._2)
)
//("zhangsan", (99 + 96 + 97, 3))
val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2) )
//3.获取结果,打印结果
resultRDD.collect().foreach(println(_))
}
四、总结
简单总结一下,首先, combineByKey 这个算子中接收三个参数(函数)。第一个函数是转换数据的函数,也就是一个初始函数,这个初始函数有一个特点,它只作用于第一条数据,用于开启整个计算。第二个函数是在分区上进行聚合,第三个函数是把所有分区的聚合结果聚合成最终结果。
这就是 combineByKey 的整个步骤。 combineByKey 确实比较难,最重要的是要理解 createCombiner 、 mergeValue 、 mergeCombiners 这三个函数的作用。