一.引言
Spark - 一文搞懂 Partitioner 一文中介绍了 Spark Partitioner 内几种常见的 Partition 分区函数,HashPartitioner 使用 Hash 算法进行分区,而 RangePartitioner 则是对全局的 partition 进行采样获取 keyList,随后构造 rangeBounds 将 key 尽可能的按范围分到新的分区内,本文主要探索 RangePartitioner 源码中 rangeBounds 的生成,rangeBounds 用于对 key 进行范围分区,通过源码可以学习到如何在分布式大数据下采样并获取近似均分的范围。
二.源码分析
1.RangePartitioner
RangePartitioner 根据范围将元素大致均匀的分配至不同分区 partition,范围通过传入 RDD 的内容采样来确定。
编辑
RangePartition 通过原始 rdd 的 partition 信息采样获取 rangeBounds: Array[K],其中 K 为分区 key 对应的数据类型, partitions 参数为待分区数目,通常情况下 rangeBounds 的长度为 partitions - 1,因为 n-1 个边界可以分出 n 个区域。
编辑
后面 getPartition 中 key: Any 对上上述 rangeBounds 的类型 K,这里只是根据数组长度决定采用暴力遍历法还是二分查找法,与之前提到的 Scala - 数值型特征分桶 有异曲同工之妙,有兴趣的同学可以看看。
2.rangeBounds
从上面 RangePartition 的源码就可以看出其实现 range 分区全依赖 rangeBounds 的生成,下面先简单看下 rangeBounds 生成的源码,后续会使用 demo 复现源码实现步骤:
编辑
上面是 rangeBounds 生成的源码,大致可以分为 ABCD 四个部分,下面对 ABCD 四个部分做简要的解释,随后复现源码并详细分析:
A.sketch 简要
sampleSize 为总采样数量,sampleSizePerPartition 为每个 partition 的采样数量,这两个数字的计算都与 RangePartition 的传参有关。这里最关键的就是 sketch 过程,可以理解为简要采样,粗略采样都可以,这一步主要实现从 partition 进行分区 key 的粗采样过程。
B.addAndFilter 添加与过滤
这一部分基于 sketch 生成的采样数据进行筛选,根据 fraction 和 sampleSize 确定是否有数据倾斜即数据不匀的 partition 存在,对于倾斜的 partition 记录其 partitionId,正常的数据则加入 candidate 候选集准备后续的采样。
C.reSample 重采样
最后一步首先判断是否有倾斜分区,有倾斜分区则使用 PruningRdd 进行 resample 重采样,PruningRdd 我们之前也单独讲解过,其主要应用于选取对应 parition 运行任务,避免启动过多 task,详情可以参考 Spark - PartitionPruningRDD 详解。
D.genBounds 分区数组生成
重采样后,调用 determineBounds 生成最终的 rangeBounds。
三.源码复现
经过上面的粗略分析,这里再重复一下 rangeBounds 的生成过程:
A.sketch - 粗略采样
B.addAndFilter - 筛选合规与不合规数据
C.resample - 重采样
D.genBounds 分区数组生成
复现前首先初始化一个 RDD 供后续使用:
val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[5]") val spark = SparkSession .builder .config(conf) .getOrCreate() val sc = spark.sparkContext sc.setLogLevel("error") val oriPartitions = 5 val randomArray = (0 to 100000).toArray.toList val info = scala.util.Random.shuffle(randomArray).toArray.zipWithIndex val rdd = sc.parallelize(info).repartition(oriPartitions)
local 模式下将 100000 个数据分配到 5 个 partition 上,我们的目标是模拟 RangePartition 生成 rangeBounds 使得数据大致分配到 10 个 partition 上,即 RangeBounds 的长度为 9。Rdd 的形式为 RDD[(Int, Int)]。
1.sketch 简采样
// 这是我们需要的样本大小,需要大致平衡的输出分区,上限为1M。 //强制转换为双精度,以避免int或long溢出 val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6) println(s"SampleSize: $sampleSize") // 假设输入分区大致平衡,并且有点过采样。 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rddPartitionsLength).toInt println(s"SampleSizePerPartition: $sampleSizePerPartition")
首先根据源码的信息以及我们的待分区信息确定 sampleSize 与 sampleSizePerpartition,其中 samplePointsPerPartitionHint = 20 为 RangePartition 内的默认参数,我们也可以自定义修改。上述需求将 partition=5 转换到 partition=10,所以 partitions=10,通过计算可以得到如下信息:
编辑
其中 sampleSizePerPartition 做了 *3 的处理,因此有一定的过采样情况出现,经过计算要采样 200 条样本,其中每个 partition 大致提供 200 条样本。
A.sketch API
编辑
B.自定义 sketch
def sketch(rdd: RDD[Int], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[Int])]) = { val shift = rdd.id // val classTagK=classTag[K] // 避免序列化整个partitioner对象 val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) // 还返回输入大小的水库采样实现 val (sample, n) = reservoirSampleAndCount(iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() sketched.foreach(x => { println(s"idx:${x._1} n:${x._2} sampleSize: ${x._3.length} sample:${x._3.take(5).mkString(",")}") }) val numItems = sketched.map(_._2).sum (numItems, sketched) }
基本是把上面的照搬,下面基本解释下实验过程,mapPartitionWithIndex 获得的 idx 为对应 partition 的分区 id,可以通过 TaskContext.getPartitionId() 获取,iter 为对应 rdd 的迭代器,其形式为 Iterator[T],对应本例为 RDD[Int],首先通过 idx 计算随机 seed,根据 idx 生成随机 seed 是避免所有 partition 采用相同 seed 采样造成不泛化的采样结果。随后调用 reservoirSampleAndCount 水库采样法对 partition 内的数据进行采样并通过 Iterator 返回,采样返回的三元组分别为 partitionId,partition内数据总量,partition内采样数据量。 numItems 为当前 parititon 下的总样本数,所以 sketch 函数最后返回的数据为 (numItems,sketched) 即 (采样 RDD 总数据量,(partitionId,partition 内数据量,partitions 内采样数据量))。
2.reservoirSampleAndCount 水池采样
水库采样法
A.reservoirSample API
编辑
B.自定义 reservoirSample
编辑
这里将原函数照搬并添加一些注释与修改,用于配合自定义的 sketch 使用,水池采样大致分为三步,假设 parition 内数据量为 n,需采样 sampleSizePerPartition 条样本即 k,下面大致讲解下过程:
A.填满水池
首先将 n 条数据的前 k 条数据放入水池中填满
B.水源不足 n < k
这里如果 parititon 内 n 条数据不足以填满容量为 k 的水池,则只能返回 n 条数据了(n < k)
C.水源充足 n >= k
n > k 时先固定好水池中的 k 条数据,此时消费完第 l 条数据,所以满足均匀分布 uniform[0, l] 中选取前 k 的范围,所以使用 random 随机数生成器生成数据后需要判断是否在 k/l 的概率范围内,只有落在 k 内才会采样并替换水池中对应索引的元素,所以采样过程可以理解为一定概率换水。最终采样完毕后返回 reservior: Array[T](k),即包含 k 个元素的类型 T 的 key。
简单画个示意图:
编辑
3.AddAndFilter 添加与过滤
经过 sketch 简采样和水池采样后得到 (numItems,sketched)
numItems:采样 RDD 总数据量
sketched: (partitionId,partition 内数据量,partition 内采样数据量)
下面基于上述信息进行 candidate 候选集的添加与筛选
A.AddAndFilter API
编辑
B.自定义 AddAndFilter
编辑
numItems 为 0 代表 RDD 为空,此时返回空候选集。正常情况下首先计算 fraction,sampleSize/numItems 即我们最一开始根据公式计算得到的,后面将依据该 fraction 与 partition 内的采样情况进行筛选:
遍历 sketch 的三元组,其中 n 为 partition 内数据量,sample 为 partition 内采样数据量,如果 n * fraction > sampleSizePerPartition 则代表当前 idx 对应的 partition 内数据超过平均值,此时将 idx 加入 imbalancedPartitions 的 Set 中,后续需要重新采样,如果 partition 内正常的话,则计算 weight,weight 为每条数据的采样间隔 = 1/ sampleRatio,其中 sampleRatio = sample / n,将 (key, weight) 添加到候选集数组中。
4.resample 重采样
对于数据倾斜的分区 partition,
A.resample API
编辑
B.自定义 resample
编辑
原封不动的粘贴下来,PartitionPruningRDD 上一篇文章提到过,imbalancedPartitions 内包含了数据倾斜的 partition 对应的 idx,使用 rdd 自带的 sample 方法,按最初计算的 fraction 进行采样,随后将 (key, weight) 添加至 candidate 候选集。
5.genBounds 生成边界
待数据倾斜的分区 sample 完毕后,候选集 candidates 也准备完毕,根据 (key, weight) 即可生成最终的 rangeBounds:
A.genBounds API
编辑
B.自定义 genBounds
编辑
代码比较长,但是整体理解不难,首先通过隐函数 implict 进行 candidate _._1 的排序,这里可以使 K 继承 Ordering 类,也可以像例子中的 Int,String 一样,自带默认 Ordering 属性进行排序,生成最终 bounds 需要两个辅助变量 i、j,变量 i 负责控制 candidates 候选集的所有 _._1 key,j 负责控制循环退出条件,即 bounds 达到 partitions - 1 即退出循环。每超过步长 step,将对应对应的 key 添加至 Bounds,随后将 target 增加一个步长 step。ordering.gt 则是为了防止出现重复值,最终 bounds.toArray 生成最终的 bounds。
6.RangeBounds 生成完整代码
mport org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.util.random.XORShiftRandom import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import scala.util.Random import scala.util.hashing.byteswap32 object TestRange { def determineBounds[K : Ordering : ClassTag](candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] // 也可以自定义隐函数支持 sort 逻辑 val ordered = candidates.sortBy(_._1) // 根据 key-weight 的 key 进行排序 val numCandidates = ordered.size // 候选集大小 val sumWeights = ordered.map(_._2.toDouble).sum // 权重总和 val step = sumWeights / partitions // 分权重 // 初始化 weight、步长以及空的边界,K 与 key 的类型保持一致 var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 // 候选集元素遍历量 var j = 0 // 当前 boundary 的元素 var previousBound = Option.empty[K] // j = partitions - 1时退出,因为 n-1 个边界可以划定n个区域 while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) cumWeight += weight // weight 累加 if (cumWeight >= target) { // Skip duplicate values. 跳过重复值 if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key // 添加值 target += step // 进化到下一个 step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray } def reservoirSampleAndCount[T: ClassTag]( input: Iterator[T], k: Int, seed: Long = Random.nextLong()) : (Array[T], Long) = { val reservoir = new Array[T](k) // Put the first k elements in the reservoir. // 放置前 K 个元素 var i = 0 while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1 } // If we have consumed all the elements, return them. Otherwise do the replacement. if (i < k) { // If input size < k, trim the array to return only an array of input size. val trimReservoir = new Array[T](i) System.arraycopy(reservoir, 0, trimReservoir, 0, i) (trimReservoir, i) } else { // If input size > k, continue the sampling process. var l = i.toLong val rand = new XORShiftRandom() // val rand = new Random() rand.setSeed(seed) while (input.hasNext) { val item = input.next() l += 1 // There are k elements in the reservoir, and the l-th element has been // consumed. It should be chosen with probability k/l. The expression // below is a random long chosen uniformly from [0,l) val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { reservoir(replacementIndex.toInt) = item } } (reservoir, l) } } def sketch(rdd: RDD[Int], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[Int])]) = { val shift = rdd.id // val classTagK=classTag[K] // 避免序列化整个partitioner对象 val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) // 还返回输入大小的水库采样实现 val (sample, n) = reservoirSampleAndCount(iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() sketched.foreach(x => { println(s"idx:${x._1} n:${x._2} sampleSize: ${x._3.length} sample:${x._3.take(5).mkString(",")}") }) val numItems = sketched.map(_._2).sum (numItems, sketched) } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[5]") val spark = SparkSession .builder .config(conf) .getOrCreate() val sc = spark.sparkContext sc.setLogLevel("error") val oriPartitions = 5 val randomArray = (0 to 100000).toArray.toList val info = scala.util.Random.shuffle(randomArray).toArray.zipWithIndex val rdd = sc.parallelize(info).repartition(oriPartitions) val partitions = 10 // 待分区数目 val samplePointsPerPartitionHint: Int = 20 // 采样数 val rddPartitionsLength = rdd.partitions.length // 分区数 println(s"OriPartition: $oriPartitions RangeToPartition: $partitions") var candidatesList: Array[Int] = Array.empty[Int] if (partitions <= 1) { Array.empty[Int] } else { // 这是我们需要的样本大小,需要大致平衡的输出分区,上限为1M。 //强制转换为双精度,以避免int或long溢出 val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6) println(s"SampleSize: $sampleSize") // 假设输入分区大致平衡,并且有点过采样。 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rddPartitionsLength).toInt println(s"SampleSizePerPartition: $sampleSizePerPartition") val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { candidatesList = Array.empty[Int] } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. // 如果一个分区包含的项目数远远超过平均数,我们将从中重新采样 // 以确保从该分区收集足够的项。 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) println(s"sampleSize: $sampleSize numItems: $numItems fraction $fraction") val candidates = ArrayBuffer.empty[(Int, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { // 远超平均数则重新采样,将对应 id 加入 imbalancedPartitions println(s"id: $idx n: $n sample: ${sample.length} fraction: ${fraction * n}") imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. // 权重为抽样概率的1 val weight = (n.toDouble / sample.length).toFloat println(s"weight: $weight n: $n sample: ${sample.length}") for (key <- sample) { // 加入候选集 candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. // 以所需的采样概率重新采样不平衡分区。 val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } // candidates: [key-weight] partitions: 自定义 range 分区 candidates.size: 候选集大小 candidatesList = determineBounds(candidates, math.min(partitions, candidates.size)) } } println("候选集长度:" + candidatesList.length) println(candidatesList.mkString(" | ")) } }
通过粘贴复制源码,并做适当修改后得到了上述生成 RangeBounds 的类源码,下面分别采用 shuffle 后的数组,长度分别为 10000、100000、1000000 进行 range = 10 的生成。
10000:
1124 | 2183 | 3055 | 3991 | 4946 | 6075 | 7068 | 8087 | 9154
100000:
10624 | 19696 | 28600 | 37147 | 46209 | 56248 | 67644 | 78822 | 88162
100000:
101034 | 169888 | 270260 | 379722 | 473229 | 579174 | 720440 | 813207 | 907043
随着 partition 内数据的增加,采样的准确度也会受到相应的影响,所以我们使用 RangePartition 时经常会得到不那么均匀的分区。
四.总结
RangePartition 在工业场景下使用的次数并不多,但是通过 RangePartition 源码的 bounds 生成可以学习到以下知识点,还是非常的奈斯。
sketch: 粗采样,用于对各个 partition 内的 key 进行采样
reservoirSampleAndCount: 水库采样
byteswap32: 生成随机数 seed
PartitionPruningRdd: 生成对应 partition 的简易 RDD
sample: rdd 采样方法
XORShiftRandom: 一种快速的随机数生成算法 [后续会单独进行推理与代码验证]
determineBounds: 根据候选集生成对应长度的 bounds
学习 bounds 的生成同时也为大规模分布式数据下的数据采样提供了思路,不论最终是否要生成有序的采样 key,水库采样、随机数生成、PartitionPruningRdd、快速随机数生成方法等都值得借鉴。