背景
本文基于delta 0.7.0
spark 3.0.1
spark 3.x引入了动态分区裁剪,今天我们分析以下代码是怎么实现的
分析
直接定位到PartitionPruning.applyPartitionPruning是逻辑计划的规则
override def apply(plan: LogicalPlan): LogicalPlan = plan match { // Do not rewrite subqueries. case s: Subquery if s.correlated => plan case _ if !SQLConf.get.dynamicPartitionPruningEnabled => plan case _ => prune(plan) }
- 当是该逻辑计划是子查询且该子查询是相关的,则直接跳过,因为相关的子查询将会被重写到join条件中
- 如果没有开启动态分区,则直接跳过
- 其他条件则会跳到下一步
下一步的条件,则是会判断是否是包含join操作,如果是join操作才会进行后续的操作:
private def prune(plan: LogicalPlan): LogicalPlan = { plan transformUp { // skip this rule if there's already a DPP subquery on the LHS of a join case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j case j @ Join(left, right, joinType, Some(condition), hint) =>
具体分析一下每一步:
var newLeft = left var newRight = right // extract the left and right keys of the join condition val (leftKeys, rightKeys) = j match { case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _) => (lkeys, rkeys) case _ => (Nil, Nil) } //ExtractEquiJoinKeys的unapply方法 def unapply(join: Join): Option[ReturnType] = join match { case Join(left, right, joinType, condition, hint) => logDebug(s"Considering join on: $condition") // Find equi-join predicates that can be evaluated before the join, and thus can be used // as join keys. val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil) val joinKeys = predicates.flatMap { case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r)) case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l)) // Replace null with default value for joining key, then those rows with null in it could // be joined together case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Seq((Coalesce(Seq(l, Literal.default(l.dataType))), Coalesce(Seq(r, Literal.default(r.dataType)))), (IsNull(l), IsNull(r)) ) case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Seq((Coalesce(Seq(r, Literal.default(r.dataType))), Coalesce(Seq(l, Literal.default(l.dataType)))), (IsNull(r), IsNull(l)) ) case other => None }
ExtractEquiJoinKeys用来提取and条件分隔的多个条件,之后只有条件满足相等的才能进行下一步处理:
如果相等但是左边或者右边的表达式的为空,则不匹配
如果相等而且有对应的逻辑计划能够产生对应的属性值,则匹配
如果是EqualNullsafe,且有相应的逻辑能够产生相应的属性值,则会转换为Coalesce和isnull的判断
之后转化为leftKeys和rightKeys表达式
如join的条件是:tableA.a1 = tableB.b2 AND tableA.a2=tableB.b2
则经过该过程得到的结果为leftKey为:Seq(tableA.a1,tableA.a2) rightKeys为:Seq(tableB.b1,tableB.b2)
splitConjunctivePredicates(condition).foreach { case EqualTo(a: Expression, b: Expression) if fromDifferentSides(a, b) => val (l, r) = if (a.references.subsetOf(left.outputSet) && b.references.subsetOf(right.outputSet)) { a -> b } else { b -> a } // there should be a partitioned table and a filter on the dimension table, // otherwise the pruning will not trigger var partScan = getPartitionTableScan(l, left) if (partScan.isDefined && canPruneLeft(joinType) && hasPartitionPruningFilter(right)) { val hasBenefit = pruningHasBenefit(l, partScan.get, r, right) newLeft = insertPredicate(l, newLeft, r, right, rightKeys, hasBenefit) } else { partScan = getPartitionTableScan(r, right) if (partScan.isDefined && canPruneRight(joinType) && hasPartitionPruningFilter(left) ) { val hasBenefit = pruningHasBenefit(r, partScan.get, l, left) newRight = insertPredicate(r, newRight, l, left, leftKeys, hasBenefit) } } case _ => }
对每一个Equals对,先对左边表达式进行getPartitionTableScan 操作,该方法的作用是:
找到该表达式的最终逻辑计划,并且返回
只有该逻辑计划是HadoopFsRelation类型且存在partition列的时候,才返回该逻辑计划
如果join左边逻辑计划满足getPartitionTableScan,且join的类型是innerjoin/leftSemi/RightOuter,且该join右边逻辑计划不是一个流且存在比如> <这种的filter, 才会在左边逻辑计划插入一个DynamicPruningSubquery的父节点,但是插入该节点还有两个条件是pruningHasBenefit或者SQLConf.get.exchangeReuseEnabled 满足,默认SQLConf.get.exchangeReuseEnabled是ture
对于右边的逻辑计划也是类似的处理方式。只不过join的类型要求为inner/LeftOuter
pruningHasBenefit方法的计算逻辑为:
如果filterRatio*getPartitionTableScan.stats.sizeInByte>该逻辑计划涉及的所有叶子节点.stats.sizeInByte 则可以添加DynamicPruningSubquery
返回整个新的join操作
Join(newLeft, newRight, joinType, Some(condition), hint