SPARK闲杂--为什么复用Exchange和subquery

简介: SPARK闲杂--为什么复用Exchange和subquery

背景


本文基于Spark 3.3.0

我们在Spark代码中有时候会看到 exchangeReuseEnabled 和subqueryReuseEnabled 配置,这个配置的作用是什么,结合spark源码我们分析一下


分析


exchangeReuseEnabled

在PlanDynamicPruningFilters中我们可以看到:

        val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
          plan.exists {
            case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
              left.sameResult(sparkPlan)
            case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
              right.sameResult(sparkPlan)
            case _ => false
          }
        if (canReuseExchange) {
          val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
          val mode = broadcastMode(buildKeys, executedPlan.output)
          // plan a broadcast exchange of the build side of the join
          val exchange = BroadcastExchangeExec(mode, executedPlan)
          val name = s"dynamicpruning#${exprId.id}"
          // place the broadcast adaptor for reusing the broadcast results on the probe side
          val broadcastValues =
            SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
          DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))


只有开启了重用Exchange的情况下且存在物理计划是BroadcastHashJoinExec的情况下(只有这样才能获取最大的收益),才会进行动态分区裁剪。其实在这里并没有做过多的操作,只不过是生成了一个BroadcastExchangeExec的操作,看到这里完全没看出来重用Exchange的作用在哪里。

原因是在 Rule ReuseExchangeAndSubquery中,这里会进行exchange的替换,如果存在一样的Exchange,就会进行替换,所以以上分区裁剪中的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))涉及的BroadcastExchangeExec才会被复用。

但是为什么BroadcastExchangeExec复用了就会减少spark的计算呢?

还是拿DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) 举例:

InSubqueryExec 中 broadcastValues的类型是SubqueryBroadcastExec,而SubqueryBroadcastExec中的计算逻辑:


  @transient
  private lazy val relationFuture: Future[Array[InternalRow]] = {
    // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    Future {
      // This will run in another thread. Set the execution id so that we can connect these jobs
      // with the correct execution.
      SQLExecution.withExecutionId(session, executionId) {
        val beforeCollect = System.nanoTime()
        val broadcastRelation = child.executeBroadcast[HashedRelation]().value

可以看到这个relationFuture变量是lazy val修饰的(这样多次调动这个变量只会初始化一次,所以会较少driver端的计算量), 而relationFuture这个变量的初始化在:

  protected override def doPrepare(): Unit = {
    relationFuture
  }


而doPrepare方法的调用是在方法executeQuery driver端形成RDD的时候,如下:

  protected final def executeQuery[T](query: => T): T = {
    RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
      prepare()
      waitForSubqueries()
      query
    }
  }

而在val broadcastRelation = child.executeBroadcastHashedRelation.value,这里的child是BroadcastExchangeExec类型的,这样进而触发广播操作。


subqueryReuseEnabled


拿InjectRuntimeFilter举例,在InjectRuntimeFilter的规则中,会最终形成以下逻辑计划:

    val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
      ListQuery(aggregate, childOutputs = aggregate.output))
    Filter(filter, filterApplicationSidePlan)

而InSubquery最终进过PlanSubqueries规则会形成物理计划(在InjectRuntimeFilte中并不会形成InSubqueryExec,我们这里只是举例):


        val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, query)
        InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)

对于InSubqueryExc:

  def updateResult(): Unit = {
    val rows = plan.executeCollect()
    result = if (plan.output.length > 1) {
      rows.asInstanceOf[Array[Any]]
    } else {
      rows.map(_.get(0, child.dataType))
    }
    if (shouldBroadcast) {
      resultBroadcast = plan.session.sparkContext.broadcast(result)
    }
  }


该updataResult方法也是在executeQuery方法中(也是在drive端调用)被调用,而*plan.executeCollect()*中,plan是BaseSubqueryExec类型的,该类型的实现类中relationFuture变量如下(以SubqueryBroadcastExec为例):


private lazy val relationFuture

也是lazy val,也是只会初始化一次,所以在ReuseExchangeAndSubquery中对BaseSubqueryExec进行复用,就可以减少在driver端的计算

相关文章
|
SQL 流计算
Flink CDC这俩statement mode和batch mode啥区别
Flink CDC这俩statement mode和batch mode啥区别
112 1
|
分布式计算 Spark
Spark 中的 Rebalance 操作以及与Repartition操作的区别
Spark 中的 Rebalance 操作以及与Repartition操作的区别
937 0
|
存储 SQL JSON
Spark - Task 与 Partition 一一对应与参数详解
使用 spark 读取 parquet 文件,共有 M个 parquet 文件,于是启动了 PExecutor x QCores 进行如下 WordCount 代码测试,其中 P x Q = M 即 Core 数目与 parquet 文件数一一对应。
642 0
Spark - Task 与 Partition 一一对应与参数详解
|
SQL 存储 分布式计算
spark outer join push down filter rule(spark 外连接中的下推规则)
spark outer join push down filter rule(spark 外连接中的下推规则)
279 0
spark outer join push down filter rule(spark 外连接中的下推规则)
|
SQL 分布式计算 Spark
SPARK SQL中 Grouping sets转Expand怎么实现的(逻辑计划级别)
SPARK SQL中 Grouping sets转Expand怎么实现的(逻辑计划级别)
553 0
|
分布式计算 Spark
【spark系列9】spark 的动态分区裁剪上(Dynamic partition pruning)-逻辑计划
【spark系列9】spark 的动态分区裁剪上(Dynamic partition pruning)-逻辑计划
346 0
|
分布式计算 Spark
【spark系列11】spark 的动态分区裁剪下(Dynamic partition pruning)-物理计划
【spark系列11】spark 的动态分区裁剪下(Dynamic partition pruning)-物理计划
7729 0
|
SQL 人工智能 分布式计算
PostgreSQL 并行计算解说 之20 - parallel partition table wise join
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan
453 0