Spark 中的 Rebalance 操作以及与Repartition操作的区别

简介: Spark 中的 Rebalance 操作以及与Repartition操作的区别

背景

本文基本spark 3.2.1

Partitioning Hints Types中有提到Rebalance操作以及Repartition操作,而且他们都可以做数据的重分区,他们之间有什么区别呢?

SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;


分析

  • Rebalance
    参考对应的SPARK-35725,其目的是为了在AQE阶段,根据spark.sql.adaptive.advisoryPartitionSizeInBytes进行分区的重新分区,防止数据倾斜。再加上SPARK-35786,就可以根据hint进行重分区。

具体看看怎么实现的,OptimizeSkewInRebalancePartitions代码如下:


    override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
     Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
   ...
    override def apply(plan: SparkPlan): SparkPlan = {
   if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
     return plan
   }
   plan match {
     case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
       tryOptimizeSkewedPartitions(stage)
     case _ => plan
   }
 }

只有开启了 spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled了的情况下,才可以进行分区的expand,而且还得shuffle的来源还得是REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL的情况下才能适用该规则.

tryOptimizeSkewedPartitions 的具体实现可以看代码,该代码的注释很清楚:

* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
*   m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
*                            (without this rule) r1[m0-b1, m1-b1, m2-b1]
*                              /                                     \
*   r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.

我们分析一下REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL来源:

这是在ResolveHints规则中进行转换的:


   private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
    hint.parameters match {
      case partitionExprs @ Seq(_*) =>
        val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
        if (invalidParams.nonEmpty) {
          val hintName = hint.name.toUpperCase(Locale.ROOT)
          throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
        }
        RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
    }
  }
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
    _.containsPattern(UNRESOLVED_HINT), ruleId) {
    case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
        case "REPARTITION" =>
          createRepartition(shuffle = true, hint)
        case "COALESCE" =>
          createRepartition(shuffle = false, hint)
        case "REPARTITION_BY_RANGE" =>
          createRepartitionByRange(hint)
        case "REBALANCE" if conf.adaptiveExecutionEnabled =>
          createRebalance(hint)
        case _ => hint
      }
  } 

可见只有在AQE开启的情况下 该Rebalance的hint才生效,生成对应的RebalancePartitions逻辑计划,而该逻辑计划会在BasicOperators规则中,转换成ShuffleEchangeExec物理计划:

      case r: logical.RebalancePartitions =>
      val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
        REBALANCE_PARTITIONS_BY_NONE
      } else {
        REBALANCE_PARTITIONS_BY_COL
      }
      exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil

因为只有shuffle操作的时候,AQE阶段才会应用到OptimizeSkewInRebalancePartitions规则,这样才会在shuffle read阶段根据shuffle write阶段的数据进行优化。

注意:

其中 OptimizeShuffleWithLocalRead 不适用 shuffleOrigin为REBALANCE_PARTITIONS_BY_COL的,要不然在动态分区存在小文件的问题,具体见该处讨论


Repartition

相对于Rebalance,该hint只是根据指定的固定的分区数据或者列进行分区,这个时候每个分区的大小并不能控制,只能说是平均分配或者说是按照列进行hash分区(这种情况存在文件大小不一的情况)

具体的分析,可以参考Rebalance的分析。

注意一点的是在SPARK-35650之后,Repartition操作也是在AQE阶段进行优化,而在SPARK-35725 之后,如果是单纯的REPARTITION hint 也是可以达到Rebalace hint的效果,因为在此处把shuffleOrigin从REPARTITION_BY_NONE改成了REBALANCE_PARTITIONS_BY_NONE了,所以也能使用于OptimizeSkewInRebalancePartitions规则。


结论

一般在reparition用到的地方都可以Rebalance来替换,而且Rebalance有更好的文件大小的控制能力,更多的信息可以查看对应的 spark-jira

相关文章
|
2月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
260 1
|
1月前
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
本文详细解析了 Apache Spark 的两种常见部署模式:Standalone 和 YARN。Standalone 模式自带轻量级集群管理服务,适合小规模集群;YARN 模式与 Hadoop 生态系统集成,适合大规模生产环境。文章通过示例代码展示了如何在两种模式下运行 Spark 应用程序,并总结了两者的优缺点,帮助读者根据需求选择合适的部署模式。
63 3
|
2月前
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
【10月更文挑战第5天】随着大数据处理需求的增长,Apache Spark 成为了广泛采用的大数据处理框架。本文详细解析了 Spark Standalone 与 YARN 两种常见部署模式的区别,并通过示例代码展示了如何在不同模式下运行 Spark 应用程序。Standalone 模式自带轻量级集群管理,适合小规模集群或独立部署;YARN 则作为外部资源管理器,能够与 Hadoop 生态系统中的其他应用共享资源,更适合大规模生产环境。文章对比了两者的资源管理、部署灵活性、扩展性和集成能力,帮助读者根据需求选择合适的部署模式。
34 1
|
6月前
|
资源调度 分布式计算 监控
Spark Standalone与YARN的区别?
【6月更文挑战第17天】Spark Standalone与YARN的区别?
354 57
|
4月前
|
缓存 分布式计算 数据处理
|
4月前
|
分布式计算 Serverless 数据处理
|
5月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 Hadoop 大数据
Spark与Hadoop的区别?
【6月更文挑战第15天】Spark与Hadoop的区别?
66 8