Spark - 一文搞懂 Partitioner

简介: spark 处理 RDD 时提供了 foreachPartition 和 mapPartition 的方法对 partition 进行处理,一个 partition 内可能包含一个文件或者多个文件的内容,Partitioner 可以基于 pairRDD 的 key 实现自定义partition 的内容。

一.引言

spark 处理 RDD 时提供了 foreachPartition 和 mapPartition 的方法对 partition 进行处理,一个 partition 内可能包含一个文件或者多个文件的内容,Partitioner 可以基于 pairRDD 的 key 实现自定义 partition 的内容。

image.gif编辑

Partitioner 函数最基本的两个方法是 numPartitions 和 getPartition(key: Any):

A.numPartitions: 获取总的分区数

B.getPartition:

根据 key 获取当前 partition 对应的分区数目,范围在 [0, numPartitions-1],这里的 partitionId 与 TaskContext.getPartitionId 的值一致,通过 hash(key) 得到 int 的 partitionNum 变量,相同 partitonNum 的 key 对应的 paidRDD 将分到同一个 partition 内处理

常见的 Partition 分区类型有如下几种:

分区函数 分区方法
HashPartitioner 根据 hash(key) 分区
RangePartitioner 根据 Range 边界分区
Partitioner 根据自定义规则分区

二.HashPartitioner

1.源码分析

hashPartitioner 基于 Object.hashcode % partitionNum 进行分区,需要注意 partitionNum 的值是需要 >= 0 的,partiionNum 的获取通过 getPartition 函数内的 nonNegativeMod 函数实现

image.gif编辑

nonNegativeMod 在实现 hashCode % partitionNum 的基础上增加了非负性的要求,因为 partitionNum 是大于等于 0 的数目:

image.gif编辑

2.代码测试

val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)
    val partitionNum = 5
    testRdd.partitionBy(new HashPartitioner(partitionNum)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} %$partitionNum=${num.hashCode() % 5}"))
      }
    })

image.gif

这里将 0-499 共 500 个数字 zipWithIndex 生成 pairRdd 并通过 HashPartitioner 生成 5 个 Partition,通过 TaskContext 获取 partitionId,为了日志一一打印,这里采用 local[1] 的配置 :

val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[1]")

image.gif

可以看到红框内同一个 TaskId 对应的 partition 内的 key 都具有相同的 mod 值,所以被分到同一分区。

image.gif编辑

3.repartition

正常使用的 repartition 函数采用 HashPartitioner 函数作为默认分区函数,下面尝试一下:

println("=============================repartition=============================")
    testRdd.repartition(5).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} %$partitionNum=${num.hashCode() % 5}"))
      }
    })

image.gif

与上面不同的是 taskId 有区别,但是相同 mod 的 key 仍然会分到同一分区:

image.gif编辑

三.RangePartitioner

1.源码分析

RangePartitioner 根据范围将元素大致均匀的分配至不同分区 partition,范围通过传入 RDD 的内容采样来确定。

image.gif编辑

除了 partitions 的参数外,RangePartitioner 还需要将待分区的 rdd 传入供随机采样生成 rangeBounds 使用,相比于 HashPartition 直接 hashCodes % partitionNum 的操作,RangePartitioner 分区共分两步:

A.获取分区 Boundary

需要采样的分区样本大小上线为 1m,转换为 double 避免精度溢出,第一个 else 逻辑内考虑如果一个分区内包含的项目数远远超过平均数,则从中重新采样,以确保该分区能够收集到足够的采样数目,最下面的 if 函数使用所需的采样概率对不平衡分区重新采样,最终得到分区的边界,这里可以抽空单独拎出来研究一下。举个例子大致理解下,假如所有 partition 内的 key 的范围是 0-500,随机生成5个分区,则生成 101-203-299-405 这样的区间,每一个数字代表其分区的上界,例如分区0的上界为 101,分区1的上界为 203,依次类推,最终生成 5 个分区。

image.gif编辑

B.根据Boundary获取分区

如果分区数组长度不大于 128,则进行简单的暴力循环搜索,如果超过 128,则进行二分查找,同时提供根据 ascending 参数决定 partitionId 的顺序或逆序。这与之前 Spark-Scala 数据特征分桶时采用的优化策略一致,有兴趣可以看看:Scala - 数值型特征分桶

image.gif编辑

2.代码测试

val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)
    testRdd.partitionBy(new RangePartitioner(5, testRdd)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} length: ${info.length}"))
      }
    })

image.gif

依然使用 500 个纯数字 RDD 进行 range 分区的测试,为了验证大致均分的思想,这里最后不再打印 mod 结果,转而打印每个 partition 内元素的数量,可以看到这次每组数量不像之前 HashPartitioner 得到的一样均匀,而是介于 500/5=100 的上下,但是总数为 500。

image.gif编辑

四.SelfPartitioner

1.源码分析

自定义 Partitioner 主要实现下述两个功能,上面也提到了,再简单补充下:

numPartitions: 获取总的分区数

getPartition: 获取 key 对应的分区 id

image.gif编辑

2.代码实现

A.SelfPartitioner

基于上面 RangePartitioner 分区不均匀的情况,我们采用 SelfParitioner 自定义分区的方式实现均匀分区,这里偷懒直接生成了对应的上界 boundary,实际场景中 boundary 应该基于 partitionNum 的数量动态生成,getPartition 函数内引入了 break 机制,不熟悉的同学可以移步:Scala - 优雅的break,随后就是基础的暴力循环,如果找到上界则返回上界对应的 index 作为分区 id。

import scala.util.control.Breaks._
    class SelfPartition(partitionNum: Int) extends Partitioner {
      val boundary: Array[(Int, Int)] = Array(100, 200, 300, 400, 500).zipWithIndex
      override def numPartitions: Int = partitionNum
      override def getPartition(key: Any): Int = {
        val keyNum = key.toString.toInt
        var partitionNum = 0
        breakable(
          boundary.foreach(bound => {
            if (keyNum < bound._1) {
              partitionNum = bound._2
              break()
            }
          })
        )
        partitionNum
      }
    }

image.gif

B.运行结果

val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)
    testRdd.partitionBy(new SelfPartition(5)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val info = partition.toArray.map(_._1)
        val taskId = TaskContext.getPartitionId()
        info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} length: ${info.length}"))
      }
    })

image.gif

通过 SelfPartitioner 分区后可以看到 0-499 共 500 个元素被均匀分配到 5 个 Partition 内,除了最简单的 boundary 方法分区外,也可以自定义 hash 方法,key 的类型默认为 Any,如果 key 不是 scala 的基本数据类型,则使用 key.asInstanceOf[T] 转换即可。

image.gif编辑

五.repartitionAndSortWithinPartitions

1.函数分析

image.gif编辑

除了正常的分区需求外,spark 还提供 repartitionAndSortWithinPartitions 函数,该函数根据给定的分区器 Partitioner 进行分区划分得到新的 RDD,并根据每个键进行排序,使得 RDD 中的数据保持一定顺序,该方法比 repartition + sorting 更加高效,因为它把排序机制放入了 shuffle 的过程中。

image.gif编辑

源码中该方法位于 OrderedRddFunctions 类内,只支持传入分区函数 Partitioner,ordering 排序规则需要使用 implict 传入隐函数的方法定义:

image.gif编辑

对于需要分区的 key: Any,需要定义隐函数保证其实现 Ordering 接口才能实现分区后排序,否则只能分区没有排序。

2.代码实现

A.分区排序依据

在分区函数的基础上,增加了 Ordering 隐函数,这里 Partitioner 函数仍然负责根据 key 得到分区 Id,和上面不同的时,分区的同时对 rdd 进行 shuffle,其中 order 的规则由隐函数给出,这里通过比较二者的分数进行排序,如果想要逆序只需要添加负号即可 -(x.score - y.score)。

// 学生类
  case class Student(name: String, score: Int)
  // 隐函数-Ordering
  implicit object StudentOrdering extends Ordering[Student] {
    override def compare(x: Student, y: Student): Int = {
      x.score - y.score
    }
  }
  class SelfSortPartition(partitionNum: Int) extends Partitioner {
    val boundary: Array[(Int, Int)] = Array(100, 200, 300, 400, 500).zipWithIndex
    override def numPartitions: Int = partitionNum
    override def getPartition(key: Any): Int = {
      val stuKey = key.asInstanceOf[Student]
      val keyNum = stuKey.name.toInt
      var partitionNum = 0
      breakable(
        boundary.foreach(bound => {
          if (keyNum < bound._1) {
            partitionNum = bound._2
            break()
          }
        })
      )
      partitionNum
    }
  }

image.gif

B.主函数

这里使用 0-499 的 String 类型作为学生的编号,Score 则采取 math.random x 100 进行模拟,分区使用 Student 的 name id,所以每个元素的分区不变,变的是每个元素的顺序。

println("=============================SortPartition=============================")
    val studentRdd = sc.parallelize((0 until 500).toArray.map(num => (Student(num.toString, (math.random * 100).toInt), true)))
    studentRdd.take(5).foreach(println(_))
    studentRdd.repartitionAndSortWithinPartitions(new SelfSortPartition(5)).foreachPartition(partition => {
      if (partition.nonEmpty) {
        val taskId = TaskContext.getPartitionId()
        println("===========================")
        partition.toArray.take(5).map(stu => {
          println(s"TaskId: ${taskId} Name: ${stu._1.name} Score: ${stu._1.score}")
        })
      }
    }

image.gif

由于使用 x.score - y.score 顺序计数,所以按分数从小到大排序:

image.gif编辑

这一届是带过最差的学生,咋还能考0分。

C.其他写法

除了 StudentOrdering 的写法,也可以采用直接 Object Student 的写法,这里 A <: Student 表示任何 A 的子类都支持该隐式调用,关于 <: 相关知识可以参考:Scala Generic 泛型类详解 - T

object Student {
    implicit def orderingByGradeStudentScore[A <: Student]: Ordering[A] = {
      Ordering.by(stu => stu.score)
    }
  }
  implicit object StudentOrdering extends Ordering[Student] {
    override def compare(x: Student, y: Student): Int = {
      x.score - y.score
    }
  }

image.gif

如果想要支持多重排序,可以在元祖内增加多个字段,会优先比较 name 再比较 score,以此类推,同理如果想要逆序,例如根据分数逆序排列,则改为 (stu.name,-1 * stu.score)

object Student {
    implicit def orderingByGradeStudentScore[A <: Student]: Ordering[A] = {
      Ordering.by(stu => (stu.name, stu.score))
    }
  }

image.gif

如果对应的分区 key 没有实现 implict 的比较隐函数,则函数会直接报灰,无法编译:

image.gif编辑

六.总结

Partitioner 的一般用法大致就这些,除了三种 HashPartitioner 函数外,Spark 也可以通过 repartitionAndSortWithinPartitions 实现分区 + 排序的需求,总体来说,Partitioner 支持用户自定义分区规则去规划任务的 task 需要处理什么样的 partition 数据,对于精细化处理和区域化定制的需求十分方便,除此之外,一些需要顺序处理的数据或者顺序存储的数据,通过 SortWithinPartitions 的方法也可以提高效率,非常奈斯👍。最后继续感叹命名的抽象性,partition -  分片、隔墙,现在看到屏风就像看到了 RDD。

image.gif编辑

目录
相关文章
|
分布式计算 Spark 索引
自定义Spark Partitioner提升es-hadoop Bulk效率
这篇文章涉及的调整也是对SparkES 多维分析引擎设计 中提及的一个重要概念“shard to partition ,partition to shard ” 的实现。不过目前只涉及到构建索引那块。
3022 0
|
24天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
69 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
63 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
92 0
|
25天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
50 6
|
23天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
79 2
|
24天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
63 1
|
25天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
53 1
|
1月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
39 1

热门文章

最新文章

下一篇
无影云桌面