主题
在sparkRDD的转换操作中,有几个比较特殊的聚合操作,很容易受到分区数的影响,很容易迷惑初学者,比如fold,aggregate等,他们都有初始值zeroValue,在多分区数据集的情况下,初始值的加入和分区数的变化,会导致不同的计算结果。因为他们在分区内部做一次带有zeroValue的聚合后,在对不同分区聚合结果进行合并的时候,会再做一次带有zeroValue的聚合。我们看如下代码
package com.xhc.sparkscala import org.apache.spark.{SparkConf, SparkContext} object AggPartitionTest { def main(args: Array[String]): Unit = { // spark上下文 val conf = new SparkConf().setAppName("AggPartitionTest").setMaster("local[*]") val sc = SparkContext.getOrCreate(conf) // 数据 val data = Array(1, 2, 3, 4, 5) // 不同分区数进行聚合 for (n <- 1 to 3){ // 分区数据 val distData = sc.parallelize(data).repartition(n) // 打印分区及分区数据 distData.mapPartitionsWithIndex((idx, items)=>{items.map(x=>(idx, x))}).foreach(println) // 计算 val foldResult = distData.fold(2)((a, b) => a + b) val aggResult = distData.aggregate(2)((a, b) => a + b, (c, d) => c + d) // 打印计算结果 println("分区数:%d,fold结果:%d!".format(n, foldResult)) println("分区数:%d,agg结果:%d!".format(n, aggResult)) } } }
运行结果如下:
分区及分区数据: (0,1) (0,2) (0,3) (0,4) (0,5) 分区数:1,fold结果:19! 分区数:1,agg结果:19! 分区及分区数据: (1,3) (0,1) (1,4) (0,2) (0,5) 分区数:2,fold结果:21! 分区数:2,agg结果:21! 分区及分区数据: (0,4) (1,2) (2,1) (0,5) (2,3) 分区数:3,fold结果:23! 分区数:3,agg结果:23!
看到了吧!不同的分区会导致不同聚合结果,我们以fold为例,用图来说明一下上例中的计算过
aggregate计算过程的分析与此类似噢!!
作者这水平有限,有不足之处欢迎留言指正