RDD 算子_转换_ combineByKey | 学习笔记

简介: 快速学习 RDD 算子_转换_ combineByKey

开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段RDD 算子_转换_ combineByKey学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/11973


RDD 算子_转换_ combineByKey


内容介绍

一、前言

二、介绍 combineByKey

三、为什么要用到 combineByKey

四、总结


一、前言

在之前的课程当中,我们学习了两个算子,即 reduceByKey groupByKey ,他们基于同一个算子来实现算法,这个算子叫做combineByKey combine 是聚合的意思。

combineByKey 对于 groupByKey reduceByKey 来说会更复杂一些,但它会更偏底层一些。同时它的可控制性会更强,它会有更多的函数交给用户去实现,用户就会更好的控制整个过程。


二、介绍 combineByKey

首先,我们先了解一下 combineByKey 的作用以及 combineByKey 所接收的函数。

combineByKey 的作用是对数据集按照 Key 进行聚合。该聚合既不是 reduce 也不是 group ,它只是一个聚合,由用户来控制。

调用方式如下:

combineByKey 里面接收6个参数,前三个参数是 createCombiner ,将 Value 进行初步的转换,即将原本的 Value 转化成另一个形式; mergeValue ,即在每个分区把上一步转换的结果进行聚合;以及 mergeCombiners ,即在所有分区上把每个分区的聚合结果进行聚合,这三个参数都是需要用户传进的函数。

后三个参数依次是 partitioner ,即分区或分区方式,怎么分区是可以用获取指定;然后是 mapSideCombiner ,是一个布尔值,通过控制它,我们可以控制是否在 map 端进行 combine ;最后的 serializer 是一个序列化器。一般情况下,后三个参数是可以默认不指定的。

如果要理解 combineByKey ,就要理解前三个参数(函数)的意义。

combineByKey 较难,需要多花点时间理解,尤其是前三个参数(函

数)。


三、为什么要用到 combineByKey

1.需求

首先介绍一下需求。现在有这样的一组数据,该数据可以想象为是用户的多门成绩。张三的第一门成绩是99分,第二门成绩是96分,第三门成绩是97。李四的第一门成绩是97,第二门成绩是98

接下来需要求得平均数,也就是每一个人成绩的平均数。首先我们需要知道平均数是怎么求的,即总分数除以科目数。然后开始我们的第一步,求出总分数,即求和,第二步是求科目数,即频率统计,这是我们的一个需求。

如图所示,最前面一列是初始输入的 RDD ,最后一列是生成结果的

RDD 。最初始的 RDD 通过中间的三个步骤转变成最终的 RDD ,我

们分别来看一下。

 1.png

首先, createCombiner 作用于每一条数据。注意(“ zhangsan ”,99.0),通过 createCombiner 函数传入后,会转成(“ zhangsan”,(99.01)),也就是  partition1 里面转换过的两个数据。mergeValue 把分区里面的数据给进行一次聚合,聚合生成了 (“zhangsan ”,(185.02)) ,这是第一个分区的结果。在进行运算时,第一个分区与第二、第三分区是同时进行的运算的,因此会同时得到三个聚合结果。

接下来是 mergeCombiners 操作,作用于所有的分区上,将分区相同的数据聚合到同一个分区里。将分区一和分区三的数据再次聚合,求出了 (“ zhangsan ”,(282.03)) ,分区二也是同理。

接下来我们看细节变化,第一个细节在我们第一步操作上。第一步操作是把99.0转成(99.01),是由 createCombiner 函数完成。第二步操作是把(99.01)转成(185.02),这一步是由 mergeValue 完成。第三步把(185.02)转变成(282.03),这一步是由 mergeCombiners 完成,这就是一整个过程。

2.实例

在了解整个过程之后,开始编写需求的代码。

首先,复制集合,进入代码,创建一个测试方法,test 注解开头。 combineByKey 是一个非常高级的一个聚合方法,同时也是一个非常底层的聚合方法,他的控制性非常的强,但是会比较的复杂,它可以去控制每一步的聚合的,控制分区里的操作,以及控制所有分区结果的操作。第一步是准备集合,第二步是具体的算子操作,第三步是获取结果,

打印结果。在操作部分又分为几个小步骤。第一步,是 createCombiner 转换数据,第二步是 mergeValue 分区上的聚合,第三步是 mergeCombiners 把所有分区上的结果再次聚合,生成最终结果。第一步,准备集合。直接将刚才复制的代码拷过来,接下来是一个小操作,为RDD准备类型。首先它是一个 RDD ,里面存放的是 ( String , Double ) ,它是一个 KeyValue 型的数据。第二步,算子操作。拿到 RDD 直接调用 combineByKey ,它接受三个参数,第一个叫做 createCombiner ,第二个叫 mergeValue ,第三个叫做 mergeCombiners 。第一个函数要用于转换数据,要先接收一个函数 ( arg ) ,然后是 returnValue arg 这块直接是 Double ,因为算子可以直接进行聚合,只需要赋值就行了。这时我们拿到的数据叫做 curr , curr 的类型是  double ,返回结果是 ( curr 1) 。完成该项操作后,就可以在分区上进行一次初步聚合,但是这一点大家需要注意,mergeValue createCombiner 是一体的,即数据进来先经历 createCombiner ,再经历 mergeValue 。在 mergeValue 当中,它的泛型结果是传入的两个参数,第一个参数是上一步转化后的数据为 ( curr :( Double Int )) ,第二个参数为 ( agg :( Double Int )) 。第二个参数传进来的并不是一个局部聚合的结果,而是当前 RDD 的下一条数据,然后这两个参数的值进行聚合。因此 createCombiner 可以理解为只作用于第一条数据。然后将 (agg :( Double Int )) 修改为 ( nextValue Double ) ,最终生成 ( curr._1 + nextValue , curr._2 + 1) 。 最后一个函数就是当前两个函数作用于每一个分区,并得到结果后,需要经历 mergeCombiners ,对所有分区的结果进行一次聚合。它将会接收到两个参数,一个是当前值curr ,另一个是局部聚合结果 agg 。第一个参数的类型是 ( Double Int ) ,第二个参数的类型是 (Double Int ) ,然后进行聚合,最后得出结果 (curr._1 + agg._1 , curr._2 + agg._2 ),到这一整个流程就完成了。最后运行代码进行数据处理,得出并核对结果。

3.代码

@Test

def combineByKey(): Unit ={

//1.准备集合

val rdd: RDD[(String,Double)] = sc.parallelize((

("zhangsan",99.0),

("zhangsan",96.0),

("lisi",97.0),

("lisi",98.0),

("zhangsan",97.0))

)

//2.算子操作

val combineResult = rdd.combineByKey(

createCombiner = (curr:Double) => (curr,1),

mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),

mergeCombiners = (curr: (Double,Int), agg: (Double,Int)) => (curr._1 + agg._1, curr._2 + agg._2)

)

//("zhangsan", (99 + 96 + 97, 3))

val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2) )

//3.获取结果,打印结果

resultRDD.collect().foreach(println(_))

}


四、总结

简单总结一下,首先, combineByKey 这个算子中接收三个参数(函数)。第一个函数是转换数据的函数,也就是一个初始函数,这个初始函数有一个特点,它只作用于第一条数据,用于开启整个计算。第二个函数是在分区上进行聚合,第三个函数是把所有分区的聚合结果聚合成最终结果。

这就是 combineByKey 的整个步骤。 combineByKey 确实比较难,最重要的是要理解 createCombiner mergeValue mergeCombiners 这三个函数的作用。

相关文章
|
存储 缓存 分布式计算
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
150 0
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
|
分布式计算
|
分布式计算 大数据 开发者
RDD 算子_转换_ aggregateByKey | 学习笔记
快速学习 RDD 算子_转换_ aggregateByKey
110 0
RDD 算子_转换_ aggregateByKey | 学习笔记
|
分布式计算 大数据 Scala
RDD 算子_转换_ foldByKey | 学习笔记
快速学习 RDD 算子_转换_ foldByKey
161 0
RDD 算子_转换_  foldByKey | 学习笔记
|
分布式计算 算法 大数据
Rdd 算子_转换_mapvalues | 学习笔记
快速学习 Rdd 算子_转换_mapvalues
131 0
Rdd 算子_转换_mapvalues | 学习笔记
|
分布式计算 大数据 Spark
Rdd 算子_转换_回顾 | 学习笔记
快速学习 Rdd 算子_转换_回顾
Rdd 算子_转换_回顾 | 学习笔记
|
分布式计算 大数据 开发者
Rdd 算子_转换_groupbykey | 学习笔记
快速学习 Rdd 算子_转换_groupbykey
144 0
Rdd 算子_转换_groupbykey | 学习笔记
|
分布式计算 大数据 Spark
RDD 算子_转换_排序 | 学习笔记
快速学习 RDD 算子_转换_排序
RDD 算子_转换_排序 | 学习笔记
|
分布式计算 大数据 Spark
Rdd 算子_转换_集合操作 | 学习笔记
快速学习 Rdd 算子_转换_集合操作
Rdd 算子_转换_集合操作 | 学习笔记
|
分布式计算 大数据 开发者
RDD 算子_转换 join | 学习笔记
快速学习 RDD 算子_转换 join
104 0
RDD 算子_转换 join | 学习笔记