开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段:RDD 算子_转换_ aggregateByKey】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/11975
RDD 算子_转换_ aggregateByKey
内容介绍
一、前言
二、介绍 aggregateByKey
三、实例
四、总结
一、前言
前面我们了解了一个算子叫做 foldByKey ,它有一个很相像的算子叫做reduceByKey , reduceByKey 的底层是 combineByKey ,而 foldByKey 的底层是 aggregateByKey 。
aggregateByKey 有点类似于 combineByKey ,也是一个聚合操作。
二、介绍 aggregateByKey
aggregateByKey 的作用是聚和所有 Key 相同的 Value ,即按照
Key 聚合 Value 。作用与 combineByKey 相同,但聚合规则不同。
首先 aggregateByKey 接收两部分函数,第一个函数是初始值,即可以指定一个初始值;第二个函数是可以指定两个参数, seqOp 和 combOp ,这两个参数都是函数。
理解 aggregateByKey 其实就是理解 seqOp 和 combOp 。seqOp 是作用于每个值的函数,而 combOp 是将转换过的值进行聚合的函数。
三、实例
如图所示,第一个 RDD 中有三个分区,每个分区都有一条与商品价格相关的数据,现在要求所有的商品打8折,那么整个商品的总价是多少。这是无法使用 combineByKey 或者 reduceByKey 完成的,首先对原有价格进行初始处理,会得到中间框架里的结果,然后将这些结果聚合起来,相同的 Key 合在一起,最终会得到商品的总价。
seqOp 作用于每一条数据,每条数据对应的 Value 进行处理,在这里就是把10×0.8。 combOp 是把整体上的每一个结果来进行聚合,生成最终的结果,这一整个过程就是 aggregateByKey 。
然后我们用代码来进行计算一下。复制集合进入代码,创建新函数。首先是准备函数,可以直接调用,然后确定指定折数,为0.8。
第一步接收参数,第一个参数是 zeroValue ,第二个参数是具体的值,即 item 。接下来是得到折扣后的数值,即 item × zeroValue 。
接收第二个函数,用于将整体的结果来进行聚合。那么其接收为 curr
和agg ,我们需要完成的是 curr + agg ,因为 curr 是一个 Double
类型,所以可以与agg 相加。 aggregateByKey 里面接收的是 curr
和 agg ,因已经处理过 Key 了,故只有值传到。经过处理后,可调
用 collect ,进行 foreach 循环,打印 print 。
运行代码后,可以发现结果相差无异,这就是 aggregateByKey ,稍
微有一点点绕。
代码如下:
@Test
def aggregateByKey() : Unit = {
val rdd = sc.parallelize(seq(("
手机", 10.0),("手机",15.0),("电脑",20.0)))
rdd.aggregateByKey(zeroValue = 0.8)((zeroValue,item) >= item * zeroValue,(curr,agg)=>curr+agg)
.collect()
.foreach(println(_))
}
四、总结
简单总结一下,首先是 aggregateByKey 的调用方式,第一个函数中是 zeroValue ,第二个函数中是 seqOp 和 combOp 。 zeroValue 是指定初始值; seqOp 是作用于每一个元素,根据初始值进行计算; combOp 是将 seqOp 处理过的结果进行聚合。
aggregateByKey 特别适合针对每个数据要先处理,后聚合的场景。这是一个比较高级的应用,但是偶尔也会遇到。