Spark - RangePartitioner rangeBounds 生成 源码分析 & 实践

简介: 本文主要探索RangePartitioner 源码中rangeBounds 的生成,rangeBounds 用于对 key 进行范围分区,通过源码可以学习到如何在分布式大数据下采样并获取近似均分的范围。

一.引言

Spark - 一文搞懂 Partitioner 一文中介绍了 Spark Partitioner 内几种常见的 Partition 分区函数,HashPartitioner 使用 Hash 算法进行分区,而 RangePartitioner 则是对全局的 partition 进行采样获取 keyList,随后构造 rangeBounds 将 key 尽可能的按范围分到新的分区内,本文主要探索 RangePartitioner 源码中 rangeBounds 的生成,rangeBounds 用于对 key 进行范围分区,通过源码可以学习到如何在分布式大数据下采样并获取近似均分的范围。

二.源码分析

1.RangePartitioner

RangePartitioner 根据范围将元素大致均匀的分配至不同分区 partition,范围通过传入 RDD 的内容采样来确定。

image.gif编辑

RangePartition 通过原始 rdd 的 partition 信息采样获取 rangeBounds: Array[K],其中 K 为分区 key 对应的数据类型, partitions 参数为待分区数目,通常情况下 rangeBounds 的长度为 partitions - 1,因为 n-1 个边界可以分出 n 个区域。

image.gif编辑

后面 getPartition 中 key: Any 对上上述 rangeBounds 的类型 K,这里只是根据数组长度决定采用暴力遍历法还是二分查找法,与之前提到的 Scala - 数值型特征分桶 有异曲同工之妙,有兴趣的同学可以看看。

2.rangeBounds

从上面 RangePartition 的源码就可以看出其实现 range 分区全依赖 rangeBounds 的生成,下面先简单看下 rangeBounds 生成的源码,后续会使用 demo 复现源码实现步骤:

image.gif编辑

上面是 rangeBounds 生成的源码,大致可以分为 ABCD 四个部分,下面对 ABCD 四个部分做简要的解释,随后复现源码并详细分析:

A.sketch 简要

sampleSize 为总采样数量,sampleSizePerPartition 为每个 partition 的采样数量,这两个数字的计算都与 RangePartition 的传参有关。这里最关键的就是 sketch 过程,可以理解为简要采样,粗略采样都可以,这一步主要实现从 partition 进行分区 key 的粗采样过程。

B.addAndFilter 添加与过滤

这一部分基于 sketch 生成的采样数据进行筛选,根据 fraction 和 sampleSize 确定是否有数据倾斜即数据不匀的 partition 存在,对于倾斜的 partition 记录其 partitionId,正常的数据则加入 candidate 候选集准备后续的采样。

C.reSample 重采样

最后一步首先判断是否有倾斜分区,有倾斜分区则使用 PruningRdd 进行 resample 重采样,PruningRdd 我们之前也单独讲解过,其主要应用于选取对应 parition 运行任务,避免启动过多 task,详情可以参考 Spark - PartitionPruningRDD 详解

D.genBounds 分区数组生成

重采样后,调用 determineBounds 生成最终的 rangeBounds。

三.源码复现

经过上面的粗略分析,这里再重复一下 rangeBounds 的生成过程:

A.sketch - 粗略采样

B.addAndFilter - 筛选合规与不合规数据

C.resample - 重采样

D.genBounds 分区数组生成

复现前首先初始化一个 RDD 供后续使用:

val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[5]")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("error")
    val oriPartitions = 5
    val randomArray = (0 to 100000).toArray.toList
    val info = scala.util.Random.shuffle(randomArray).toArray.zipWithIndex
    val rdd = sc.parallelize(info).repartition(oriPartitions)

image.gif

local 模式下将 100000 个数据分配到 5 个 partition 上,我们的目标是模拟 RangePartition 生成 rangeBounds 使得数据大致分配到 10 个 partition 上,即 RangeBounds 的长度为 9。Rdd 的形式为 RDD[(Int, Int)]。

1.sketch 简采样

// 这是我们需要的样本大小,需要大致平衡的输出分区,上限为1M。
      //强制转换为双精度,以避免int或long溢出
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      println(s"SampleSize: $sampleSize")
      // 假设输入分区大致平衡,并且有点过采样。
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rddPartitionsLength).toInt
      println(s"SampleSizePerPartition: $sampleSizePerPartition")

image.gif

首先根据源码的信息以及我们的待分区信息确定 sampleSize 与 sampleSizePerpartition,其中 samplePointsPerPartitionHint = 20 为 RangePartition 内的默认参数,我们也可以自定义修改。上述需求将 partition=5 转换到 partition=10,所以 partitions=10,通过计算可以得到如下信息:

image.gif编辑

其中 sampleSizePerPartition 做了 *3 的处理,因此有一定的过采样情况出现,经过计算要采样 200 条样本,其中每个 partition 大致提供 200 条样本。

A.sketch API

image.gif编辑

B.自定义 sketch

def sketch(rdd: RDD[Int], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[Int])]) = {
    val shift = rdd.id
    // val classTagK=classTag[K]
    // 避免序列化整个partitioner对象
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      // 还返回输入大小的水库采样实现
      val (sample, n) = reservoirSampleAndCount(iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    sketched.foreach(x => {
      println(s"idx:${x._1} n:${x._2} sampleSize: ${x._3.length} sample:${x._3.take(5).mkString(",")}")
    })
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

image.gif

基本是把上面的照搬,下面基本解释下实验过程,mapPartitionWithIndex 获得的 idx 为对应 partition 的分区 id,可以通过 TaskContext.getPartitionId() 获取,iter 为对应 rdd 的迭代器,其形式为 Iterator[T],对应本例为 RDD[Int],首先通过 idx 计算随机 seed,根据 idx 生成随机 seed 是避免所有 partition 采用相同 seed 采样造成不泛化的采样结果。随后调用 reservoirSampleAndCount 水库采样法对 partition 内的数据进行采样并通过 Iterator 返回,采样返回的三元组分别为 partitionId,partition内数据总量,partition内采样数据量。 numItems 为当前 parititon 下的总样本数,所以 sketch 函数最后返回的数据为 (numItems,sketched) 即 (采样 RDD 总数据量,(partitionId,partition 内数据量,partitions 内采样数据量))。

2.reservoirSampleAndCount 水池采样

水库采样法

A.reservoirSample API

image.gif编辑

B.自定义 reservoirSample

image.gif编辑

这里将原函数照搬并添加一些注释与修改,用于配合自定义的 sketch 使用,水池采样大致分为三步,假设 parition 内数据量为 n,需采样 sampleSizePerPartition 条样本即 k,下面大致讲解下过程:

A.填满水池

首先将 n 条数据的前 k 条数据放入水池中填满

B.水源不足 n < k

这里如果 parititon 内 n 条数据不足以填满容量为 k 的水池,则只能返回 n 条数据了(n < k)

C.水源充足 n >= k

n > k 时先固定好水池中的 k 条数据,此时消费完第 l 条数据,所以满足均匀分布 uniform[0, l] 中选取前 k 的范围,所以使用 random 随机数生成器生成数据后需要判断是否在 k/l 的概率范围内,只有落在 k 内才会采样并替换水池中对应索引的元素,所以采样过程可以理解为一定概率换水。最终采样完毕后返回 reservior: Array[T](k),即包含 k 个元素的类型 T 的 key。

简单画个示意图:

image.gif编辑

3.AddAndFilter 添加与过滤

经过 sketch 简采样和水池采样后得到 (numItems,sketched)

numItems:采样 RDD 总数据量

sketched: (partitionId,partition 内数据量,partition 内采样数据量)

下面基于上述信息进行 candidate 候选集的添加与筛选

A.AddAndFilter API

image.gif编辑

B.自定义 AddAndFilter  

image.gif编辑

numItems 为 0 代表 RDD 为空,此时返回空候选集。正常情况下首先计算 fraction,sampleSize/numItems 即我们最一开始根据公式计算得到的,后面将依据该 fraction 与 partition 内的采样情况进行筛选:

遍历 sketch 的三元组,其中 n 为 partition 内数据量,sample 为 partition 内采样数据量,如果 n * fraction > sampleSizePerPartition 则代表当前 idx 对应的 partition 内数据超过平均值,此时将 idx 加入 imbalancedPartitions 的 Set 中,后续需要重新采样,如果 partition 内正常的话,则计算 weight,weight 为每条数据的采样间隔 = 1/ sampleRatio,其中 sampleRatio = sample / n,将 (key, weight) 添加到候选集数组中。

4.resample 重采样

对于数据倾斜的分区 partition,

A.resample API

image.gif编辑

B.自定义 resample

image.gif编辑

原封不动的粘贴下来,PartitionPruningRDD 上一篇文章提到过,imbalancedPartitions 内包含了数据倾斜的 partition 对应的 idx,使用 rdd 自带的 sample 方法,按最初计算的 fraction 进行采样,随后将 (key, weight) 添加至 candidate 候选集。

5.genBounds 生成边界

待数据倾斜的分区 sample 完毕后,候选集 candidates 也准备完毕,根据 (key, weight) 即可生成最终的 rangeBounds:

A.genBounds API

image.gif编辑

B.自定义 genBounds

image.gif编辑

代码比较长,但是整体理解不难,首先通过隐函数 implict 进行 candidate _._1 的排序,这里可以使 K 继承 Ordering 类,也可以像例子中的 Int,String 一样,自带默认 Ordering 属性进行排序,生成最终 bounds 需要两个辅助变量 i、j,变量 i 负责控制 candidates 候选集的所有 _._1 key,j 负责控制循环退出条件,即 bounds 达到 partitions - 1 即退出循环。每超过步长 step,将对应对应的 key 添加至 Bounds,随后将 target 增加一个步长 step。ordering.gt 则是为了防止出现重复值,最终 bounds.toArray 生成最终的 bounds。

6.RangeBounds 生成完整代码

mport org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.random.XORShiftRandom
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.Random
import scala.util.hashing.byteswap32
object TestRange {
  def determineBounds[K : Ordering : ClassTag](candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]] // 也可以自定义隐函数支持 sort 逻辑
    val ordered = candidates.sortBy(_._1) // 根据 key-weight 的 key 进行排序
    val numCandidates = ordered.size // 候选集大小
    val sumWeights = ordered.map(_._2.toDouble).sum // 权重总和
    val step = sumWeights / partitions // 分权重
    // 初始化 weight、步长以及空的边界,K 与 key 的类型保持一致
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0 // 候选集元素遍历量
    var j = 0 // 当前 boundary 的元素
    var previousBound = Option.empty[K]
    // j = partitions - 1时退出,因为 n-1 个边界可以划定n个区域
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      cumWeight += weight // weight 累加
      if (cumWeight >= target) {
        // Skip duplicate values. 跳过重复值
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key // 添加值
          target += step // 进化到下一个 step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }
  def reservoirSampleAndCount[T: ClassTag](
                                            input: Iterator[T],
                                            k: Int,
                                            seed: Long = Random.nextLong())
  : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // Put the first k elements in the reservoir.
    // 放置前 K 个元素
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }
    // If we have consumed all the elements, return them. Otherwise do the replacement.
    if (i < k) {
      // If input size < k, trim the array to return only an array of input size.
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      // If input size > k, continue the sampling process.
      var l = i.toLong
      val rand = new XORShiftRandom()
      //      val rand = new Random()
      rand.setSeed(seed)
      while (input.hasNext) {
        val item = input.next()
        l += 1
        // There are k elements in the reservoir, and the l-th element has been
        // consumed. It should be chosen with probability k/l. The expression
        // below is a random long chosen uniformly from [0,l)
        val replacementIndex = (rand.nextDouble() * l).toLong
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }
  def sketch(rdd: RDD[Int], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[Int])]) = {
    val shift = rdd.id
    // val classTagK=classTag[K]
    // 避免序列化整个partitioner对象
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      // 还返回输入大小的水库采样实现
      val (sample, n) = reservoirSampleAndCount(iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    sketched.foreach(x => {
      println(s"idx:${x._1} n:${x._2} sampleSize: ${x._3.length} sample:${x._3.take(5).mkString(",")}")
    })
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[5]")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("error")
    val oriPartitions = 5
    val randomArray = (0 to 100000).toArray.toList
    val info = scala.util.Random.shuffle(randomArray).toArray.zipWithIndex
    val rdd = sc.parallelize(info).repartition(oriPartitions)
    val partitions = 10 // 待分区数目
    val samplePointsPerPartitionHint: Int = 20 // 采样数
    val rddPartitionsLength = rdd.partitions.length // 分区数
    println(s"OriPartition: $oriPartitions RangeToPartition: $partitions")
    var candidatesList: Array[Int] = Array.empty[Int]
    if (partitions <= 1) {
      Array.empty[Int]
    } else {
      // 这是我们需要的样本大小,需要大致平衡的输出分区,上限为1M。
      //强制转换为双精度,以避免int或long溢出
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      println(s"SampleSize: $sampleSize")
      // 假设输入分区大致平衡,并且有点过采样。
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rddPartitionsLength).toInt
      println(s"SampleSizePerPartition: $sampleSizePerPartition")
      val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        candidatesList = Array.empty[Int]
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 如果一个分区包含的项目数远远超过平均数,我们将从中重新采样
        // 以确保从该分区收集足够的项。
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        println(s"sampleSize: $sampleSize numItems: $numItems fraction $fraction")
        val candidates = ArrayBuffer.empty[(Int, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 远超平均数则重新采样,将对应 id 加入 imbalancedPartitions
            println(s"id: $idx n: $n sample: ${sample.length} fraction: ${fraction * n}")
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            // 权重为抽样概率的1
            val weight = (n.toDouble / sample.length).toFloat
            println(s"weight: $weight n: $n sample: ${sample.length}")
            for (key <- sample) {
              // 加入候选集
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 以所需的采样概率重新采样不平衡分区。
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        // candidates: [key-weight] partitions: 自定义 range 分区 candidates.size: 候选集大小
        candidatesList = determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
    println("候选集长度:" + candidatesList.length)
    println(candidatesList.mkString(" | "))
  }
}

image.gif

通过粘贴复制源码,并做适当修改后得到了上述生成 RangeBounds 的类源码,下面分别采用 shuffle 后的数组,长度分别为 10000、100000、1000000 进行 range = 10 的生成。

10000:

1124 | 2183 | 3055 | 3991 | 4946 | 6075 | 7068 | 8087 | 9154

image.gif

100000:

10624 | 19696 | 28600 | 37147 | 46209 | 56248 | 67644 | 78822 | 88162

image.gif

100000:

101034 | 169888 | 270260 | 379722 | 473229 | 579174 | 720440 | 813207 | 907043

image.gif

随着 partition 内数据的增加,采样的准确度也会受到相应的影响,所以我们使用 RangePartition 时经常会得到不那么均匀的分区。

四.总结

RangePartition 在工业场景下使用的次数并不多,但是通过 RangePartition 源码的 bounds 生成可以学习到以下知识点,还是非常的奈斯。

sketch: 粗采样,用于对各个 partition 内的 key 进行采样

reservoirSampleAndCount: 水库采样

byteswap32: 生成随机数 seed

PartitionPruningRdd: 生成对应 partition 的简易 RDD

sample: rdd 采样方法

XORShiftRandom: 一种快速的随机数生成算法 [后续会单独进行推理与代码验证]

determineBounds: 根据候选集生成对应长度的 bounds

学习 bounds 的生成同时也为大规模分布式数据下的数据采样提供了思路,不论最终是否要生成有序的采样 key,水库采样、随机数生成、PartitionPruningRdd、快速随机数生成方法等都值得借鉴。

目录
相关文章
|
分布式计算
Spark PruneDependency 依赖关系 RangePartitioner
Spark PruneDependency 依赖关系 RangePartitioner Represents a dependency between the PartitionPruningRDD and its parent.
954 0
|
4月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
161 0
|
20天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
4月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
3天前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
8天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
3月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
130 0
|
4月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理
|
5月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
111 0
|
5月前
|
SQL 分布式计算 大数据
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
102 0