SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(1)

简介: SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(1)

背景


本文基于 SPARK 3.3.0

从一个unit test来探究SPARK Codegen的逻辑,


  test("SortAggregate should be included in WholeStageCodegen") {
    val df = spark.range(10).agg(max(col("id")), avg(col("id")))
    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
      val plan = df.queryExecution.executedPlan
      assert(plan.exists(p =>
        p.isInstanceOf[WholeStageCodegenExec] &&
          p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
      assert(df.collect() === Array(Row(9, 4.5)))
    }
  }

分析


执行计划的真实面目

Spark的全代码流程,网上和代码中都有提及,如下:


  WholeStageCodegen       Plan A               FakeInput        Plan B
  =========================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -------> inputRDDs() ------> execute()
      |
      +----------------->   produce()
                              |
                           doProduce()  -------> produce()
                                                    |
                                                 doProduce()
                                                    |
                          doConsume() <--------- consume()
                              |
   doConsume()  <--------  consume()

整体逻辑都知道,道理都懂,但是里面涉及到了好多细节,就拿以上的例子来说,会生成如下的执行计划:

*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
   +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
      +- *(1) Range (0, 10, step=1, splits=2)

但是实际在缕代码的时候,你就会发现这个全代码的逻辑根本就缕不通,那是因为CollapseCodegenStages规则会加WholeStageCodegenExec和InputAdapter物理计划,加了以后的计划为:

WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
  InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
    WholeStageCodegen
   +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
      +- *(1) Range (0, 10, step=1, splits=2)

注意:类似有*(1),*(2)这种符号的,表明是有全代码生成的,而为什么在物理计划的时候没有显示 WholeStageCodegenExec 和 InputAdapter 计划是因为该两个计划重写了generateTreeString方法:

WholeStageCodegenExec 重写为如下:

override def generateTreeString(
      depth: Int,
      lastChildren: Seq[Boolean],
      append: String => Unit,
      verbose: Boolean,
      prefix: String = "",
      addSuffix: Boolean = false,
      maxFields: Int,
      printNodeId: Boolean,
      indent: Int = 0): Unit = {
    child.generateTreeString(
      depth,
      lastChildren,
      append,
      verbose,
      if (printNodeId) "* " else s"*($codegenStageId) ",
      false,
      maxFields,
      printNodeId,
      indent)
  }

这里的*($codegenStageId) 就是上面所说的*(1),*(2),而这里的数字1和2代表者不同的两个代码生成阶段,因为Exchange不支持代码生成,所以被隔离成了两个代码生成。而*($codegenStageId) 作为子计划的前缀传递到了下游。

InputAdapter 重写为如下:


 override def generateTreeString(
      depth: Int,
      lastChildren: Seq[Boolean],
      append: String => Unit,
      verbose: Boolean,
      prefix: String = "",
      addSuffix: Boolean = false,
      maxFields: Int,
      printNodeId: Boolean,
      indent: Int = 0): Unit = {
    child.generateTreeString(
      depth,
      lastChildren,
      append,
      verbose,
      prefix = "",
      addSuffix = false,
      maxFields,
      printNodeId,
      indent)

看到这里的prefix = “”,所以单纯从执行计划看是没有任何迹象能表明存在着InputAdapter计划.

所以说,我们最后应该看到的数据流应为:

第一阶段wholeStageCodegen:

 WholeStageCodegenExec      SortAggregateExec(Partial)     RangeExec        
  =========================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() 
      |
   doCodeGen()
      |
      +----------------->   produce()
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume()<------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()

第二阶段wholeStageCodegen:

 WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
  ====================================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
      |                                                                            |
   doCodeGen()                                                                  doExecute()     
      |                                                                            |
      +----------------->   produce()                                           ShuffledRowRDD
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume() <------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()


相关文章
|
4月前
|
分布式计算 算法 数据挖掘
Spark中的图计算库GraphX是什么?请解释其作用和常用操作。
Spark中的图计算库GraphX是什么?请解释其作用和常用操作。
37 1
|
9月前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch并行与分布式(三)DataParallel原理、源码解析、举例实战
PyTorch并行与分布式(三)DataParallel原理、源码解析、举例实战
321 0
|
SQL 分布式计算 Serverless
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(6)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(6)
75 0
|
SQL 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(9)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(9)
92 0
|
SQL 分布式计算 数据处理
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(7)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(7)
92 0
|
SQL 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
145 0
|
缓存 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(2)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(2)
105 0
|
SQL 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(5)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(5)
141 0
|
SQL 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(4)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(4)
367 0
|
SQL 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(8)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(8)
172 0