【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(一)

简介: 【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(一)

AQE简介


从spark configuration,到在最早在spark 1.6版本就已经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实践;到了spark 3.0时代,Databricks和intel一起为社区贡献了新的AQE


spark 3.0.1中的AQE的配置

image.png

分析1


在OptimizeSkewedJoin.scala中,我们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理计划中的规则)

 /**
   * The goal of skew join optimization is to make the data distribution more even. The target size
   * to split skewed partitions is the average size of non-skewed partition, or the
   * advisory partition size if avg size is smaller than it.
   */
  private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
    val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
    val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
    // It's impossible that all the partitions are skewed, as we use median size to define skew.
    assert(nonSkewSizes.nonEmpty)
    math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
  }

其中:


nonSkewSizes为task非倾斜的分区

targetSize返回的是max(非倾斜的分区的平均值,advisorySize),其中advisorySize为spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以说

targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值

medianSize值为task的分区大小的中位值

分析2


在SQLConf.scala

def numShufflePartitions: Int = {
    if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
      getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
    } else {
      defaultNumShufflePartitions
    }
  }

从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,这在如果有多个shuffle stage的情况下,增加分区数,可以有效的增强shuffle分区合并的效果


分析3


在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理计划的规则,会执行如下操作

 if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {
      plan
    } else {
      // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
      // we should skip it when calculating the `partitionStartIndices`.
      val validMetrics = shuffleStages.flatMap(_.mapStats)
      // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
      // in that case. For example when we union fully aggregated data (data is arranged to a single
      // partition) and a result of a SortMergeJoin (multiple partitions).
      val distinctNumPreShufflePartitions =
        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
      if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
        // We fall back to Spark default parallelism if the minimum number of coalesced partitions
        // is not set, so to avoid perf regressions compared to no coalescing.
        val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
          .getOrElse(session.sparkContext.defaultParallelism)
        val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
          validMetrics.toArray,
          advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
          minNumPartitions = minPartitionNum)
        // This transformation adds new nodes, so we must use `transformUp` here.
        val stageIds = shuffleStages.map(_.id).toSet
        plan.transformUp {
          // even for shuffle exchange whose input RDD has 0 partition, we should still update its
          // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
          // number of output partitions.
          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
            CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)
        }
      } else {
        plan
      }
    }
  }

也就是说:


如果是用户自己指定的分区操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化

如果多个task进行shuffle,且task有不同的分区数的话,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化

见ShufflePartitionsUtil.coalescePartition分析

分析4


在OptimizeSkewedJoin.scala中,我们看到

/**
   * A partition is considered as a skewed partition if its size is larger than the median
   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
   * ADVISORY_PARTITION_SIZE_IN_BYTES.
   */
  private def isSkewed(size: Long, medianSize: Long): Boolean = {
    size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
      size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
  }

OptimizeSkewedJoin是个物理计划的规则,会根据isSkewed来判断是否数据数据有倾斜,而且必须是满足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才会判断为数据倾斜了

medianSize为task的分区大小的中位值

分析5


在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中调用了reOptimize方法,而reOptimize方法则会执行逻辑计划的优化操作:

private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
    logicalPlan.invalidateStatsCache()
    val optimized = optimizer.execute(logicalPlan)
    val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
    val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)
    (newPlan, optimized)
  }

而optimizer 中有个DemoteBroadcastHashJoin规则:

@transient private val optimizer = new RuleExecutor[LogicalPlan] {
    // TODO add more optimization rules
    override protected def batches: Seq[Batch] = Seq(
      Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf))
    )
  }

而对于DemoteBroadcastHashJoin则有对是否broadcastjoin的判断:

case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {
  private def shouldDemote(plan: LogicalPlan): Boolean = plan match {
    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined
      && stage.mapStats.isDefined =>
      val mapStats = stage.mapStats.get
      val partitionCnt = mapStats.bytesByPartitionId.length
      val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
      partitionCnt > 0 && nonZeroCnt > 0 &&
        (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
    case _ => false
  }
  def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
    case j @ Join(left, right, _, _, hint) =>
      var newHint = hint
      if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) {
        newHint = newHint.copy(leftHint =
          Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }
      if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) {
        newHint = newHint.copy(rightHint =
          Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }
      if (newHint.ne(hint)) {
        j.copy(hint = newHint)
      } else {
        j
      }
  }
}

shouldDemote就是对是否进行broadcastjoin的判断:


首先得是ShuffleQueryStageExec操作

如果非空分区比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,则不会把mergehashjoin转换为broadcastJoin

这在sql中先join在groupby的场景中比较容易出现


相关文章
|
5月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
189 1
Spark快速大数据分析PDF下载读书分享推荐
|
7月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
146 0
|
7月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
271 0
|
7月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
7月前
|
SQL 分布式计算 HIVE
Spark数据倾斜问题分析和解决
Spark数据倾斜问题分析和解决
96 0
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
179 2
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
111 0
|
5月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23729 42
|
7月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56611 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
6月前
|
机器学习/深度学习 数据采集 分布式计算
基于spark的大数据分析预测地震受灾情况的系统设计
基于spark的大数据分析预测地震受灾情况的系统设计
174 1