【spark系列9】spark 的动态分区裁剪上(Dynamic partition pruning)-逻辑计划

简介: 【spark系列9】spark 的动态分区裁剪上(Dynamic partition pruning)-逻辑计划

背景


本文基于delta 0.7.0

spark 3.0.1

spark 3.x引入了动态分区裁剪,今天我们分析以下代码是怎么实现的

分析


直接定位到PartitionPruning.applyPartitionPruning是逻辑计划的规则

override def apply(plan: LogicalPlan): LogicalPlan = plan match {
    // Do not rewrite subqueries.
    case s: Subquery if s.correlated => plan
    case _ if !SQLConf.get.dynamicPartitionPruningEnabled => plan
    case _ => prune(plan)
  }
  • 当是该逻辑计划是子查询且该子查询是相关的,则直接跳过,因为相关的子查询将会被重写到join条件中
  • 如果没有开启动态分区,则直接跳过
  • 其他条件则会跳到下一步
    下一步的条件,则是会判断是否是包含join操作,如果是join操作才会进行后续的操作:
private def prune(plan: LogicalPlan): LogicalPlan = {
    plan transformUp {
      // skip this rule if there's already a DPP subquery on the LHS of a join
      case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j
      case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j
      case j @ Join(left, right, joinType, Some(condition), hint) =>

具体分析一下每一步:

var newLeft = left
        var newRight = right
        // extract the left and right keys of the join condition
        val (leftKeys, rightKeys) = j match {
          case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _) => (lkeys, rkeys)
          case _ => (Nil, Nil)
        }
        //ExtractEquiJoinKeys的unapply方法
        def unapply(join: Join): Option[ReturnType] = join match {
    case Join(left, right, joinType, condition, hint) =>
      logDebug(s"Considering join on: $condition")
      // Find equi-join predicates that can be evaluated before the join, and thus can be used
      // as join keys.
      val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
      val joinKeys = predicates.flatMap {
        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
        case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
        case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
        // Replace null with default value for joining key, then those rows with null in it could
        // be joined together
        case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
          Seq((Coalesce(Seq(l, Literal.default(l.dataType))),
            Coalesce(Seq(r, Literal.default(r.dataType)))),
            (IsNull(l), IsNull(r))
          )
        case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
          Seq((Coalesce(Seq(r, Literal.default(r.dataType))),
            Coalesce(Seq(l, Literal.default(l.dataType)))),
            (IsNull(r), IsNull(l))
          )
        case other => None
      }

ExtractEquiJoinKeys用来提取and条件分隔的多个条件,之后只有条件满足相等的才能进行下一步处理:


如果相等但是左边或者右边的表达式的为空,则不匹配

如果相等而且有对应的逻辑计划能够产生对应的属性值,则匹配

如果是EqualNullsafe,且有相应的逻辑能够产生相应的属性值,则会转换为Coalesce和isnull的判断

之后转化为leftKeys和rightKeys表达式

如join的条件是:tableA.a1 = tableB.b2 AND tableA.a2=tableB.b2

则经过该过程得到的结果为leftKey为:Seq(tableA.a1,tableA.a2) rightKeys为:Seq(tableB.b1,tableB.b2)

 splitConjunctivePredicates(condition).foreach {
          case EqualTo(a: Expression, b: Expression)
              if fromDifferentSides(a, b) =>
            val (l, r) = if (a.references.subsetOf(left.outputSet) &&
              b.references.subsetOf(right.outputSet)) {
              a -> b
            } else {
              b -> a
            }
            // there should be a partitioned table and a filter on the dimension table,
            // otherwise the pruning will not trigger
            var partScan = getPartitionTableScan(l, left)
            if (partScan.isDefined && canPruneLeft(joinType) &&
                hasPartitionPruningFilter(right)) {
              val hasBenefit = pruningHasBenefit(l, partScan.get, r, right)
              newLeft = insertPredicate(l, newLeft, r, right, rightKeys, hasBenefit)
            } else {
              partScan = getPartitionTableScan(r, right)
              if (partScan.isDefined && canPruneRight(joinType) &&
                  hasPartitionPruningFilter(left) ) {
                val hasBenefit = pruningHasBenefit(r, partScan.get, l, left)
                newRight = insertPredicate(r, newRight, l, left, leftKeys, hasBenefit)
              }
            }
          case _ =>
        }

对每一个Equals对,先对左边表达式进行getPartitionTableScan 操作,该方法的作用是:


找到该表达式的最终逻辑计划,并且返回

只有该逻辑计划是HadoopFsRelation类型且存在partition列的时候,才返回该逻辑计划

如果join左边逻辑计划满足getPartitionTableScan,且join的类型是innerjoin/leftSemi/RightOuter,且该join右边逻辑计划不是一个流且存在比如> <这种的filter, 才会在左边逻辑计划插入一个DynamicPruningSubquery的父节点,但是插入该节点还有两个条件是pruningHasBenefit或者SQLConf.get.exchangeReuseEnabled 满足,默认SQLConf.get.exchangeReuseEnabled是ture

对于右边的逻辑计划也是类似的处理方式。只不过join的类型要求为inner/LeftOuter

pruningHasBenefit方法的计算逻辑为:

如果filterRatio*getPartitionTableScan.stats.sizeInByte>该逻辑计划涉及的所有叶子节点.stats.sizeInByte 则可以添加DynamicPruningSubquery


返回整个新的join操作

 Join(newLeft, newRight, joinType, Some(condition), hint
相关文章
|
1月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
111 1
|
8月前
|
SQL 存储 分布式计算
Hive 和 Spark 分区策略剖析
Hive 和 Spark 分区策略剖析
|
1月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
2天前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之使用spark.sql执行rename分区操作,遇到任务报错退出的情况,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
13天前
|
分布式计算 监控 大数据
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
|
1月前
|
存储 缓存 分布式计算
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
|
1月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
10月前
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
1月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理
|
1月前
|
分布式计算 Hadoop 大数据
Spark 【分区与并行度】
Spark 【分区与并行度】