【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(二)

简介: 【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(二)

ShufflePartitionsUtil.coalescePartition分析(合并分区的核心代码)

coalescePartition如示:

def coalescePartitions(
      mapOutputStatistics: Array[MapOutputStatistics],
      advisoryTargetSize: Long,
      minNumPartitions: Int): Seq[ShufflePartitionSpec] = {
    // If `minNumPartitions` is very large, it is possible that we need to use a value less than
    // `advisoryTargetSize` as the target size of a coalesced task.
    val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
    // The max at here is to make sure that when we have an empty table, we only have a single
    // coalesced partition.
    // There is no particular reason that we pick 16. We just need a number to prevent
    // `maxTargetSize` from being set to 0.
    val maxTargetSize = math.max(
      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
    val targetSize = math.min(maxTargetSize, advisoryTargetSize)
    val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")
    logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +
      s"actual target size $targetSize.")
    // Make sure these shuffles have the same number of partitions.
    val distinctNumShufflePartitions =
      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
    // The reason that we are expecting a single value of the number of shuffle partitions
    // is that when we add Exchanges, we set the number of shuffle partitions
    // (i.e. map output partitions) using a static setting, which is the value of
    // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different
    // number of partitions, they will have the same number of shuffle partitions
    // (i.e. map output partitions).
    assert(
      distinctNumShufflePartitions.length == 1,
      "There should be only one distinct value of the number of shuffle partitions " +
        "among registered Exchange operators.")
    val numPartitions = distinctNumShufflePartitions.head
    val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()
    var latestSplitPoint = 0
    var coalescedSize = 0L
    var i = 0
    while (i < numPartitions) {
      // We calculate the total size of i-th shuffle partitions from all shuffles.
      var totalSizeOfCurrentPartition = 0L
      var j = 0
      while (j < mapOutputStatistics.length) {
        totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)
        j += 1
      }
      // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a
      // new coalesced partition.
      if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {
        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
        latestSplitPoint = i
        // reset postShuffleInputSize.
        coalescedSize = totalSizeOfCurrentPartition
      } else {
        coalescedSize += totalSizeOfCurrentPartition
      }
      i += 1
    }
    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)
    partitionSpecs
  }

totalPostShuffleInputSize 先计算出总的shuffle的数据大小

maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值

targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以说该值只是建议值,不一定是targetSize

while循环就是取相邻的分区合并,对于每个task中的每个相邻分区合并,直到不大于targetSize

OptimizeSkewedJoin.optimizeSkewJoin分析(数据倾斜优化的核心代码)


见optimizeSkewJoin如示:

def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
    case smj @ SortMergeJoinExec(_, _, joinType, _,
        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
        if supportedJoinTypes.contains(joinType) =>
      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
      val numPartitions = left.partitionsWithSizes.length
      // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
      val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
      val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
      logDebug(
        s"""
          |Optimizing skewed join.
          |Left side partitions size info:
          |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
          |Right side partitions size info:
          |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
        """.stripMargin)
      val canSplitLeft = canSplitLeftSide(joinType)
      val canSplitRight = canSplitRightSide(joinType)
      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
      // the final data distribution is even (coalesced partitions + split partitions).
      val leftActualSizes = left.partitionsWithSizes.map(_._2)
      val rightActualSizes = right.partitionsWithSizes.map(_._2)
      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val leftSkewDesc = new SkewDesc
      val rightSkewDesc = new SkewDesc
      for (partitionIndex <- 0 until numPartitions) {
        val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
        val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
          val reducerId = leftPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(
            left.mapStats.shuffleId, reducerId, leftTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
              s"${skewSpecs.get.length} parts.")
            leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(leftPartSpec))
        } else {
          Seq(leftPartSpec)
        }
        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val rightParts = if (isRightSkew && !isRightCoalesced) {
          val reducerId = rightPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(
            right.mapStats.shuffleId, reducerId, rightTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Right side partition $partitionIndex is skewed, split it into " +
              s"${skewSpecs.get.length} parts.")
            rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(rightPartSpec))
        } else {
          Seq(rightPartSpec)
        }
        for {
          leftSidePartition <- leftParts
          rightSidePartition <- rightParts
        } {
          leftSidePartitions += leftSidePartition
          rightSidePartitions += rightSidePartition
        }
      }
      logDebug("number of skewed partitions: " +
        s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")
      if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
        val newLeft = CustomShuffleReaderExec(
          left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
        val newRight = CustomShuffleReaderExec(
          right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
        smj.copy(
          left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
      } else {
        smj
      }
  }

SortMergeJoinExec说明适用于sort merge join

assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保证进行join的两个task的分区数相等

分别计算进行join的task的分区中位数的大小leftMedSize和rightMedSize

分别计算进行join的task的分区的targetzise大小leftTargetSize和rightTargetSize

循环判断两个task的每个分区的是否存在倾斜,如果倾斜且满足没有进行过shuffle分区合并,则进行倾斜分区处理,否则不处理

createSkewPartitionSpecs方法为:

1.获取每个join的task的对应分区的数据大小

2.根据targetSize分成多个slice

如果存在数据倾斜,则构造包装成CustomShuffleReaderExec,进行后续任务的运行,最最终调用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec进行数据的读取,其中还会自动开启“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch减少io

OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用


如:AdaptiveSparkPlanExec

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
    ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf)
  )

可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin,

而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用

,而InsertAdaptiveSparkPlan在QueryExecution中被调用


而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我们看到

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
    conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
      plan.find {
        case _: Exchange => true
        case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
        case p => p.expressions.exists(_.find {
          case _: SubqueryExpression => true
          case _ => false
        }.isDefined)
      }.isDefined
    }
  }
private def supportAdaptive(plan: SparkPlan): Boolean = {
    // TODO migrate dynamic-partition-pruning onto adaptive execution.
    sanityCheck(plan) &&
      !plan.logicalLink.exists(_.isStreaming) &&
      !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
    plan.children.forall(supportAdaptive)
  }

如果不满足以上条件也是不会开启AQE的,如果要强制开启,也可以配置spark.sql.adaptive.forceApply 为true(文档中提示是内部配置)


注意:


在spark 3.0.1中已经废弃了如下的配置:

spark.sql.adaptive.skewedPartitionMaxSplits    
spark.sql.adaptive.skewedPartitionRowCountThreshold    
spark.sql.adaptive.skewedPartitionSizeThreshold   

本文部分参考:

https://mp.weixin.qq.com/s?__biz=MzA5MTc0NTMwNQ==&mid=2650718363&idx=1&sn=d20fffebafdd2bed6939eaeb39f5e6e3

https://mp.weixin.qq.com/s/RvFpXWpV8APcGTHhftS6NQ



相关文章
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
207 1
Spark快速大数据分析PDF下载读书分享推荐
|
8月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
159 0
|
8月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
275 0
|
8月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
8月前
|
SQL 分布式计算 HIVE
Spark数据倾斜问题分析和解决
Spark数据倾斜问题分析和解决
100 0
|
3月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
206 2
|
3月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
131 0
|
6月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23748 42
|
8月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56619 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
7月前
|
机器学习/深度学习 数据采集 分布式计算
基于spark的大数据分析预测地震受灾情况的系统设计
基于spark的大数据分析预测地震受灾情况的系统设计
211 1