AQE简介
从spark configuration,到在最早在spark 1.6版本就已经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实践;到了spark 3.0时代,Databricks和intel一起为社区贡献了新的AQE
spark 3.0.1中的AQE的配置
分析1
在OptimizeSkewedJoin.scala中,我们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理计划中的规则)
/** * The goal of skew join optimization is to make the data distribution more even. The target size * to split skewed partitions is the average size of non-skewed partition, or the * advisory partition size if avg size is smaller than it. */ private def targetSize(sizes: Seq[Long], medianSize: Long): Long = { val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize)) // It's impossible that all the partitions are skewed, as we use median size to define skew. assert(nonSkewSizes.nonEmpty) math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length) }
其中:
nonSkewSizes为task非倾斜的分区
targetSize返回的是max(非倾斜的分区的平均值,advisorySize),其中advisorySize为spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以说
targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值
medianSize值为task的分区大小的中位值
分析2
在SQLConf.scala
def numShufflePartitions: Int = { if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) { getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions) } else { defaultNumShufflePartitions } }
从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,这在如果有多个shuffle stage的情况下,增加分区数,可以有效的增强shuffle分区合并的效果
分析3
在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理计划的规则,会执行如下操作
if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) { plan } else { // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions, // we should skip it when calculating the `partitionStartIndices`. val validMetrics = shuffleStages.flatMap(_.mapStats) // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number // in that case. For example when we union fully aggregated data (data is arranged to a single // partition) and a result of a SortMergeJoin (multiple partitions). val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { // We fall back to Spark default parallelism if the minimum number of coalesced partitions // is not set, so to avoid perf regressions compared to no coalescing. val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) .getOrElse(session.sparkContext.defaultParallelism) val partitionSpecs = ShufflePartitionsUtil.coalescePartitions( validMetrics.toArray, advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), minNumPartitions = minPartitionNum) // This transformation adds new nodes, so we must use `transformUp` here. val stageIds = shuffleStages.map(_.id).toSet plan.transformUp { // even for shuffle exchange whose input RDD has 0 partition, we should still update its // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same // number of output partitions. case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) => CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION) } } else { plan } } }
也就是说:
如果是用户自己指定的分区操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化
如果多个task进行shuffle,且task有不同的分区数的话,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化
见ShufflePartitionsUtil.coalescePartition分析
分析4
在OptimizeSkewedJoin.scala中,我们看到
/** * A partition is considered as a skewed partition if its size is larger than the median * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than * ADVISORY_PARTITION_SIZE_IN_BYTES. */ private def isSkewed(size: Long, medianSize: Long): Boolean = { size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) && size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD) }
OptimizeSkewedJoin是个物理计划的规则,会根据isSkewed来判断是否数据数据有倾斜,而且必须是满足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才会判断为数据倾斜了
medianSize为task的分区大小的中位值
分析5
在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中调用了reOptimize方法,而reOptimize方法则会执行逻辑计划的优化操作:
private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = { logicalPlan.invalidateStatsCache() val optimized = optimizer.execute(logicalPlan) val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next() val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules) (newPlan, optimized) }
而optimizer 中有个DemoteBroadcastHashJoin规则:
@transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) ) }
而对于DemoteBroadcastHashJoin则有对是否broadcastjoin的判断:
case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] { private def shouldDemote(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined && stage.mapStats.isDefined => val mapStats = stage.mapStats.get val partitionCnt = mapStats.bytesByPartitionId.length val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0) partitionCnt > 0 && nonZeroCnt > 0 && (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin case _ => false } def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { case j @ Join(left, right, _, _, hint) => var newHint = hint if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) { newHint = newHint.copy(leftHint = Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH)))) } if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) { newHint = newHint.copy(rightHint = Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH)))) } if (newHint.ne(hint)) { j.copy(hint = newHint) } else { j } } }
shouldDemote就是对是否进行broadcastjoin的判断:
首先得是ShuffleQueryStageExec操作
如果非空分区比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,则不会把mergehashjoin转换为broadcastJoin
这在sql中先join在groupby的场景中比较容易出现