ShufflePartitionsUtil.coalescePartition分析(合并分区的核心代码)
def coalescePartitions( mapOutputStatistics: Array[MapOutputStatistics], advisoryTargetSize: Long, minNumPartitions: Int): Seq[ShufflePartitionSpec] = { // If `minNumPartitions` is very large, it is possible that we need to use a value less than // `advisoryTargetSize` as the target size of a coalesced task. val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum // The max at here is to make sure that when we have an empty table, we only have a single // coalesced partition. // There is no particular reason that we pick 16. We just need a number to prevent // `maxTargetSize` from being set to 0. val maxTargetSize = math.max( math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16) val targetSize = math.min(maxTargetSize, advisoryTargetSize) val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ") logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " + s"actual target size $targetSize.") // Make sure these shuffles have the same number of partitions. val distinctNumShufflePartitions = mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct // The reason that we are expecting a single value of the number of shuffle partitions // is that when we add Exchanges, we set the number of shuffle partitions // (i.e. map output partitions) using a static setting, which is the value of // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different // number of partitions, they will have the same number of shuffle partitions // (i.e. map output partitions). assert( distinctNumShufflePartitions.length == 1, "There should be only one distinct value of the number of shuffle partitions " + "among registered Exchange operators.") val numPartitions = distinctNumShufflePartitions.head val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]() var latestSplitPoint = 0 var coalescedSize = 0L var i = 0 while (i < numPartitions) { // We calculate the total size of i-th shuffle partitions from all shuffles. var totalSizeOfCurrentPartition = 0L var j = 0 while (j < mapOutputStatistics.length) { totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i) j += 1 } // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a // new coalesced partition. if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) { partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i) latestSplitPoint = i // reset postShuffleInputSize. coalescedSize = totalSizeOfCurrentPartition } else { coalescedSize += totalSizeOfCurrentPartition } i += 1 } partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions) partitionSpecs }
totalPostShuffleInputSize 先计算出总的shuffle的数据大小
maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值
targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以说该值只是建议值,不一定是targetSize
while循环就是取相邻的分区合并,对于每个task中的每个相邻分区合并,直到不大于targetSize
OptimizeSkewedJoin.optimizeSkewJoin分析(数据倾斜优化的核心代码)
见optimizeSkewJoin如示:
def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { case smj @ SortMergeJoinExec(_, _, joinType, _, s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _), s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _) if supportedJoinTypes.contains(joinType) => assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length) val numPartitions = left.partitionsWithSizes.length // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions. val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2)) val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2)) logDebug( s""" |Optimizing skewed join. |Left side partitions size info: |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))} |Right side partitions size info: |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))} """.stripMargin) val canSplitLeft = canSplitLeftSide(joinType) val canSplitRight = canSplitRightSide(joinType) // We use the actual partition sizes (may be coalesced) to calculate target size, so that // the final data distribution is even (coalesced partitions + split partitions). val leftActualSizes = left.partitionsWithSizes.map(_._2) val rightActualSizes = right.partitionsWithSizes.map(_._2) val leftTargetSize = targetSize(leftActualSizes, leftMedSize) val rightTargetSize = targetSize(rightActualSizes, rightMedSize) val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val leftSkewDesc = new SkewDesc val rightSkewDesc = new SkewDesc for (partitionIndex <- 0 until numPartitions) { val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1 val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1 val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex // A skewed partition should never be coalesced, but skip it here just to be safe. val leftParts = if (isLeftSkew && !isLeftCoalesced) { val reducerId = leftPartSpec.startReducerIndex val skewSpecs = createSkewPartitionSpecs( left.mapStats.shuffleId, reducerId, leftTargetSize) if (skewSpecs.isDefined) { logDebug(s"Left side partition $partitionIndex is skewed, split it into " + s"${skewSpecs.get.length} parts.") leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex)) } skewSpecs.getOrElse(Seq(leftPartSpec)) } else { Seq(leftPartSpec) } // A skewed partition should never be coalesced, but skip it here just to be safe. val rightParts = if (isRightSkew && !isRightCoalesced) { val reducerId = rightPartSpec.startReducerIndex val skewSpecs = createSkewPartitionSpecs( right.mapStats.shuffleId, reducerId, rightTargetSize) if (skewSpecs.isDefined) { logDebug(s"Right side partition $partitionIndex is skewed, split it into " + s"${skewSpecs.get.length} parts.") rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex)) } skewSpecs.getOrElse(Seq(rightPartSpec)) } else { Seq(rightPartSpec) } for { leftSidePartition <- leftParts rightSidePartition <- rightParts } { leftSidePartitions += leftSidePartition rightSidePartitions += rightSidePartition } } logDebug("number of skewed partitions: " + s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}") if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) { val newLeft = CustomShuffleReaderExec( left.shuffleStage, leftSidePartitions, leftSkewDesc.toString) val newRight = CustomShuffleReaderExec( right.shuffleStage, rightSidePartitions, rightSkewDesc.toString) smj.copy( left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true) } else { smj } }
SortMergeJoinExec说明适用于sort merge join
assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保证进行join的两个task的分区数相等
分别计算进行join的task的分区中位数的大小leftMedSize和rightMedSize
分别计算进行join的task的分区的targetzise大小leftTargetSize和rightTargetSize
循环判断两个task的每个分区的是否存在倾斜,如果倾斜且满足没有进行过shuffle分区合并,则进行倾斜分区处理,否则不处理
createSkewPartitionSpecs方法为:
1.获取每个join的task的对应分区的数据大小
2.根据targetSize分成多个slice
如果存在数据倾斜,则构造包装成CustomShuffleReaderExec,进行后续任务的运行,最最终调用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec进行数据的读取,其中还会自动开启“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch减少io
OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用
如:AdaptiveSparkPlanExec
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), CoalesceShufflePartitions(context.session), // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' // added by `CoalesceShufflePartitions`. So they must be executed after it. OptimizeSkewedJoin(conf), OptimizeLocalShuffleReader(conf) )
可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin,
而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用
,而InsertAdaptiveSparkPlan在QueryExecution中被调用
而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我们看到
private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = { conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || { plan.find { case _: Exchange => true case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true case p => p.expressions.exists(_.find { case _: SubqueryExpression => true case _ => false }.isDefined) }.isDefined } } private def supportAdaptive(plan: SparkPlan): Boolean = { // TODO migrate dynamic-partition-pruning onto adaptive execution. sanityCheck(plan) && !plan.logicalLink.exists(_.isStreaming) && !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) && plan.children.forall(supportAdaptive) }
如果不满足以上条件也是不会开启AQE的,如果要强制开启,也可以配置spark.sql.adaptive.forceApply 为true(文档中提示是内部配置)
注意:
在spark 3.0.1中已经废弃了如下的配置:
spark.sql.adaptive.skewedPartitionMaxSplits spark.sql.adaptive.skewedPartitionRowCountThreshold spark.sql.adaptive.skewedPartitionSizeThreshold
本文部分参考:
https://mp.weixin.qq.com/s/RvFpXWpV8APcGTHhftS6NQ