Spark - RangePartitioner rangeBounds 生成 源码分析 & 实践

简介: 本文主要探索RangePartitioner 源码中rangeBounds 的生成,rangeBounds 用于对 key 进行范围分区,通过源码可以学习到如何在分布式大数据下采样并获取近似均分的范围。

一.引言

Spark - 一文搞懂 Partitioner 一文中介绍了 Spark Partitioner 内几种常见的 Partition 分区函数,HashPartitioner 使用 Hash 算法进行分区,而 RangePartitioner 则是对全局的 partition 进行采样获取 keyList,随后构造 rangeBounds 将 key 尽可能的按范围分到新的分区内,本文主要探索 RangePartitioner 源码中 rangeBounds 的生成,rangeBounds 用于对 key 进行范围分区,通过源码可以学习到如何在分布式大数据下采样并获取近似均分的范围。

二.源码分析

1.RangePartitioner

RangePartitioner 根据范围将元素大致均匀的分配至不同分区 partition,范围通过传入 RDD 的内容采样来确定。

image.gif编辑

RangePartition 通过原始 rdd 的 partition 信息采样获取 rangeBounds: Array[K],其中 K 为分区 key 对应的数据类型, partitions 参数为待分区数目,通常情况下 rangeBounds 的长度为 partitions - 1,因为 n-1 个边界可以分出 n 个区域。

image.gif编辑

后面 getPartition 中 key: Any 对上上述 rangeBounds 的类型 K,这里只是根据数组长度决定采用暴力遍历法还是二分查找法,与之前提到的 Scala - 数值型特征分桶 有异曲同工之妙,有兴趣的同学可以看看。

2.rangeBounds

从上面 RangePartition 的源码就可以看出其实现 range 分区全依赖 rangeBounds 的生成,下面先简单看下 rangeBounds 生成的源码,后续会使用 demo 复现源码实现步骤:

image.gif编辑

上面是 rangeBounds 生成的源码,大致可以分为 ABCD 四个部分,下面对 ABCD 四个部分做简要的解释,随后复现源码并详细分析:

A.sketch 简要

sampleSize 为总采样数量,sampleSizePerPartition 为每个 partition 的采样数量,这两个数字的计算都与 RangePartition 的传参有关。这里最关键的就是 sketch 过程,可以理解为简要采样,粗略采样都可以,这一步主要实现从 partition 进行分区 key 的粗采样过程。

B.addAndFilter 添加与过滤

这一部分基于 sketch 生成的采样数据进行筛选,根据 fraction 和 sampleSize 确定是否有数据倾斜即数据不匀的 partition 存在,对于倾斜的 partition 记录其 partitionId,正常的数据则加入 candidate 候选集准备后续的采样。

C.reSample 重采样

最后一步首先判断是否有倾斜分区,有倾斜分区则使用 PruningRdd 进行 resample 重采样,PruningRdd 我们之前也单独讲解过,其主要应用于选取对应 parition 运行任务,避免启动过多 task,详情可以参考 Spark - PartitionPruningRDD 详解

D.genBounds 分区数组生成

重采样后,调用 determineBounds 生成最终的 rangeBounds。

三.源码复现

经过上面的粗略分析,这里再重复一下 rangeBounds 的生成过程:

A.sketch - 粗略采样

B.addAndFilter - 筛选合规与不合规数据

C.resample - 重采样

D.genBounds 分区数组生成

复现前首先初始化一个 RDD 供后续使用:

val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[5]")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("error")
    val oriPartitions = 5
    val randomArray = (0 to 100000).toArray.toList
    val info = scala.util.Random.shuffle(randomArray).toArray.zipWithIndex
    val rdd = sc.parallelize(info).repartition(oriPartitions)

image.gif

local 模式下将 100000 个数据分配到 5 个 partition 上,我们的目标是模拟 RangePartition 生成 rangeBounds 使得数据大致分配到 10 个 partition 上,即 RangeBounds 的长度为 9。Rdd 的形式为 RDD[(Int, Int)]。

1.sketch 简采样

// 这是我们需要的样本大小,需要大致平衡的输出分区,上限为1M。
      //强制转换为双精度,以避免int或long溢出
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      println(s"SampleSize: $sampleSize")
      // 假设输入分区大致平衡,并且有点过采样。
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rddPartitionsLength).toInt
      println(s"SampleSizePerPartition: $sampleSizePerPartition")

image.gif

首先根据源码的信息以及我们的待分区信息确定 sampleSize 与 sampleSizePerpartition,其中 samplePointsPerPartitionHint = 20 为 RangePartition 内的默认参数,我们也可以自定义修改。上述需求将 partition=5 转换到 partition=10,所以 partitions=10,通过计算可以得到如下信息:

image.gif编辑

其中 sampleSizePerPartition 做了 *3 的处理,因此有一定的过采样情况出现,经过计算要采样 200 条样本,其中每个 partition 大致提供 200 条样本。

A.sketch API

image.gif编辑

B.自定义 sketch

def sketch(rdd: RDD[Int], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[Int])]) = {
    val shift = rdd.id
    // val classTagK=classTag[K]
    // 避免序列化整个partitioner对象
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      // 还返回输入大小的水库采样实现
      val (sample, n) = reservoirSampleAndCount(iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    sketched.foreach(x => {
      println(s"idx:${x._1} n:${x._2} sampleSize: ${x._3.length} sample:${x._3.take(5).mkString(",")}")
    })
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

image.gif

基本是把上面的照搬,下面基本解释下实验过程,mapPartitionWithIndex 获得的 idx 为对应 partition 的分区 id,可以通过 TaskContext.getPartitionId() 获取,iter 为对应 rdd 的迭代器,其形式为 Iterator[T],对应本例为 RDD[Int],首先通过 idx 计算随机 seed,根据 idx 生成随机 seed 是避免所有 partition 采用相同 seed 采样造成不泛化的采样结果。随后调用 reservoirSampleAndCount 水库采样法对 partition 内的数据进行采样并通过 Iterator 返回,采样返回的三元组分别为 partitionId,partition内数据总量,partition内采样数据量。 numItems 为当前 parititon 下的总样本数,所以 sketch 函数最后返回的数据为 (numItems,sketched) 即 (采样 RDD 总数据量,(partitionId,partition 内数据量,partitions 内采样数据量))。

2.reservoirSampleAndCount 水池采样

水库采样法

A.reservoirSample API

image.gif编辑

B.自定义 reservoirSample

image.gif编辑

这里将原函数照搬并添加一些注释与修改,用于配合自定义的 sketch 使用,水池采样大致分为三步,假设 parition 内数据量为 n,需采样 sampleSizePerPartition 条样本即 k,下面大致讲解下过程:

A.填满水池

首先将 n 条数据的前 k 条数据放入水池中填满

B.水源不足 n < k

这里如果 parititon 内 n 条数据不足以填满容量为 k 的水池,则只能返回 n 条数据了(n < k)

C.水源充足 n >= k

n > k 时先固定好水池中的 k 条数据,此时消费完第 l 条数据,所以满足均匀分布 uniform[0, l] 中选取前 k 的范围,所以使用 random 随机数生成器生成数据后需要判断是否在 k/l 的概率范围内,只有落在 k 内才会采样并替换水池中对应索引的元素,所以采样过程可以理解为一定概率换水。最终采样完毕后返回 reservior: Array[T](k),即包含 k 个元素的类型 T 的 key。

简单画个示意图:

image.gif编辑

3.AddAndFilter 添加与过滤

经过 sketch 简采样和水池采样后得到 (numItems,sketched)

numItems:采样 RDD 总数据量

sketched: (partitionId,partition 内数据量,partition 内采样数据量)

下面基于上述信息进行 candidate 候选集的添加与筛选

A.AddAndFilter API

image.gif编辑

B.自定义 AddAndFilter  

image.gif编辑

numItems 为 0 代表 RDD 为空,此时返回空候选集。正常情况下首先计算 fraction,sampleSize/numItems 即我们最一开始根据公式计算得到的,后面将依据该 fraction 与 partition 内的采样情况进行筛选:

遍历 sketch 的三元组,其中 n 为 partition 内数据量,sample 为 partition 内采样数据量,如果 n * fraction > sampleSizePerPartition 则代表当前 idx 对应的 partition 内数据超过平均值,此时将 idx 加入 imbalancedPartitions 的 Set 中,后续需要重新采样,如果 partition 内正常的话,则计算 weight,weight 为每条数据的采样间隔 = 1/ sampleRatio,其中 sampleRatio = sample / n,将 (key, weight) 添加到候选集数组中。

4.resample 重采样

对于数据倾斜的分区 partition,

A.resample API

image.gif编辑

B.自定义 resample

image.gif编辑

原封不动的粘贴下来,PartitionPruningRDD 上一篇文章提到过,imbalancedPartitions 内包含了数据倾斜的 partition 对应的 idx,使用 rdd 自带的 sample 方法,按最初计算的 fraction 进行采样,随后将 (key, weight) 添加至 candidate 候选集。

5.genBounds 生成边界

待数据倾斜的分区 sample 完毕后,候选集 candidates 也准备完毕,根据 (key, weight) 即可生成最终的 rangeBounds:

A.genBounds API

image.gif编辑

B.自定义 genBounds

image.gif编辑

代码比较长,但是整体理解不难,首先通过隐函数 implict 进行 candidate _._1 的排序,这里可以使 K 继承 Ordering 类,也可以像例子中的 Int,String 一样,自带默认 Ordering 属性进行排序,生成最终 bounds 需要两个辅助变量 i、j,变量 i 负责控制 candidates 候选集的所有 _._1 key,j 负责控制循环退出条件,即 bounds 达到 partitions - 1 即退出循环。每超过步长 step,将对应对应的 key 添加至 Bounds,随后将 target 增加一个步长 step。ordering.gt 则是为了防止出现重复值,最终 bounds.toArray 生成最终的 bounds。

6.RangeBounds 生成完整代码

mport org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.random.XORShiftRandom
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.Random
import scala.util.hashing.byteswap32
object TestRange {
  def determineBounds[K : Ordering : ClassTag](candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]] // 也可以自定义隐函数支持 sort 逻辑
    val ordered = candidates.sortBy(_._1) // 根据 key-weight 的 key 进行排序
    val numCandidates = ordered.size // 候选集大小
    val sumWeights = ordered.map(_._2.toDouble).sum // 权重总和
    val step = sumWeights / partitions // 分权重
    // 初始化 weight、步长以及空的边界,K 与 key 的类型保持一致
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0 // 候选集元素遍历量
    var j = 0 // 当前 boundary 的元素
    var previousBound = Option.empty[K]
    // j = partitions - 1时退出,因为 n-1 个边界可以划定n个区域
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      cumWeight += weight // weight 累加
      if (cumWeight >= target) {
        // Skip duplicate values. 跳过重复值
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key // 添加值
          target += step // 进化到下一个 step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }
  def reservoirSampleAndCount[T: ClassTag](
                                            input: Iterator[T],
                                            k: Int,
                                            seed: Long = Random.nextLong())
  : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // Put the first k elements in the reservoir.
    // 放置前 K 个元素
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }
    // If we have consumed all the elements, return them. Otherwise do the replacement.
    if (i < k) {
      // If input size < k, trim the array to return only an array of input size.
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      // If input size > k, continue the sampling process.
      var l = i.toLong
      val rand = new XORShiftRandom()
      //      val rand = new Random()
      rand.setSeed(seed)
      while (input.hasNext) {
        val item = input.next()
        l += 1
        // There are k elements in the reservoir, and the l-th element has been
        // consumed. It should be chosen with probability k/l. The expression
        // below is a random long chosen uniformly from [0,l)
        val replacementIndex = (rand.nextDouble() * l).toLong
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }
  def sketch(rdd: RDD[Int], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[Int])]) = {
    val shift = rdd.id
    // val classTagK=classTag[K]
    // 避免序列化整个partitioner对象
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      // 还返回输入大小的水库采样实现
      val (sample, n) = reservoirSampleAndCount(iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    sketched.foreach(x => {
      println(s"idx:${x._1} n:${x._2} sampleSize: ${x._3.length} sample:${x._3.take(5).mkString(",")}")
    })
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[5]")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("error")
    val oriPartitions = 5
    val randomArray = (0 to 100000).toArray.toList
    val info = scala.util.Random.shuffle(randomArray).toArray.zipWithIndex
    val rdd = sc.parallelize(info).repartition(oriPartitions)
    val partitions = 10 // 待分区数目
    val samplePointsPerPartitionHint: Int = 20 // 采样数
    val rddPartitionsLength = rdd.partitions.length // 分区数
    println(s"OriPartition: $oriPartitions RangeToPartition: $partitions")
    var candidatesList: Array[Int] = Array.empty[Int]
    if (partitions <= 1) {
      Array.empty[Int]
    } else {
      // 这是我们需要的样本大小,需要大致平衡的输出分区,上限为1M。
      //强制转换为双精度,以避免int或long溢出
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      println(s"SampleSize: $sampleSize")
      // 假设输入分区大致平衡,并且有点过采样。
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rddPartitionsLength).toInt
      println(s"SampleSizePerPartition: $sampleSizePerPartition")
      val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        candidatesList = Array.empty[Int]
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 如果一个分区包含的项目数远远超过平均数,我们将从中重新采样
        // 以确保从该分区收集足够的项。
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        println(s"sampleSize: $sampleSize numItems: $numItems fraction $fraction")
        val candidates = ArrayBuffer.empty[(Int, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 远超平均数则重新采样,将对应 id 加入 imbalancedPartitions
            println(s"id: $idx n: $n sample: ${sample.length} fraction: ${fraction * n}")
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            // 权重为抽样概率的1
            val weight = (n.toDouble / sample.length).toFloat
            println(s"weight: $weight n: $n sample: ${sample.length}")
            for (key <- sample) {
              // 加入候选集
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 以所需的采样概率重新采样不平衡分区。
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        // candidates: [key-weight] partitions: 自定义 range 分区 candidates.size: 候选集大小
        candidatesList = determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
    println("候选集长度:" + candidatesList.length)
    println(candidatesList.mkString(" | "))
  }
}

image.gif

通过粘贴复制源码,并做适当修改后得到了上述生成 RangeBounds 的类源码,下面分别采用 shuffle 后的数组,长度分别为 10000、100000、1000000 进行 range = 10 的生成。

10000:

1124 | 2183 | 3055 | 3991 | 4946 | 6075 | 7068 | 8087 | 9154

image.gif

100000:

10624 | 19696 | 28600 | 37147 | 46209 | 56248 | 67644 | 78822 | 88162

image.gif

100000:

101034 | 169888 | 270260 | 379722 | 473229 | 579174 | 720440 | 813207 | 907043

image.gif

随着 partition 内数据的增加,采样的准确度也会受到相应的影响,所以我们使用 RangePartition 时经常会得到不那么均匀的分区。

四.总结

RangePartition 在工业场景下使用的次数并不多,但是通过 RangePartition 源码的 bounds 生成可以学习到以下知识点,还是非常的奈斯。

sketch: 粗采样,用于对各个 partition 内的 key 进行采样

reservoirSampleAndCount: 水库采样

byteswap32: 生成随机数 seed

PartitionPruningRdd: 生成对应 partition 的简易 RDD

sample: rdd 采样方法

XORShiftRandom: 一种快速的随机数生成算法 [后续会单独进行推理与代码验证]

determineBounds: 根据候选集生成对应长度的 bounds

学习 bounds 的生成同时也为大规模分布式数据下的数据采样提供了思路,不论最终是否要生成有序的采样 key,水库采样、随机数生成、PartitionPruningRdd、快速随机数生成方法等都值得借鉴。

目录
相关文章
|
SQL 分布式计算 Java
Spark入门指南:从基础概念到实践应用全解析
在这个数据驱动的时代,信息的处理和分析变得越来越重要。而在众多的大数据处理框架中, Apache Spark 以其独特的优势脱颖而出。
168 0
|
5月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
438 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
235 0
|
5月前
|
分布式计算 Hadoop Serverless
数据处理的艺术:EMR Serverless Spark实践及应用体验
阿里云EMR Serverless Spark是基于Spark的全托管大数据处理平台,融合云原生弹性与自动化,提供任务全生命周期管理,让数据工程师专注数据分析。它内置高性能Fusion Engine,性能比开源Spark提升200%,并有成本优化的Celeborn服务。支持计算存储分离、OSS-HDFS兼容、DLF元数据管理,实现一站式的开发体验和Serverless资源管理。适用于数据报表、科学项目等场景,简化开发与运维流程。用户可通过阿里云控制台快速配置和体验EMR Serverless Spark服务。
|
6月前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
378 1
|
6月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
7月前
|
分布式计算 Shell 开发工具
Spark编程实验二:RDD编程初级实践
Spark编程实验二:RDD编程初级实践
379 1
|
分布式计算 Kubernetes Serverless
Hago 的 Spark on ACK 实践
Hago 的 Spark on ACK 实践
|
SQL JSON 分布式计算
提高数据的安全性和可控性,数栈基于 Ranger 实现的 Spark SQL 权限控制实践之路
在企业级应用中,数据的安全性和隐私保护是极其重要的,为了实现Spark SQL 对数据的精细化管理及提高数据的安全性和可控性,数栈基于 Apache Ranger 实现了 Spark SQL 对数据处理的权限控制,本文将重点描述数栈如何基于 Ranger 赋予了 Spark SQL 在权限管控方面,更强的管控力度、更丰富的能力。
265 0
|
SQL 分布式计算 Java
五、【计算】Spark原理与实践(下) | 青训营笔记
五、【计算】Spark原理与实践(下) | 青训营笔记
五、【计算】Spark原理与实践(下) | 青训营笔记