背景
本文基于 SPARK 3.3.0
从一个unit test来探究SPARK Codegen的逻辑,
test("SortAggregate should be included in WholeStageCodegen") { val df = spark.range(10).agg(max(col("id")), avg(col("id"))) withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") { val plan = df.queryExecution.executedPlan assert(plan.exists(p => p.isInstanceOf[WholeStageCodegenExec] && p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec])) assert(df.collect() === Array(Row(9, 4.5))) } }
分析
执行计划的真实面目
Spark的全代码流程,网上和代码中都有提及,如下:
WholeStageCodegen Plan A FakeInput Plan B ========================================================================= -> execute() | doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute() | +-----------------> produce() | doProduce() -------> produce() | doProduce() | doConsume() <--------- consume() | doConsume() <-------- consume()
整体逻辑都知道,道理都懂,但是里面涉及到了好多细节,就拿以上的例子来说,会生成如下的执行计划:
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13] +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L]) +- *(1) Range (0, 10, step=1, splits=2)
但是实际在缕代码的时候,你就会发现这个全代码的逻辑根本就缕不通,那是因为CollapseCodegenStages规则会加WholeStageCodegenExec和InputAdapter物理计划,加了以后的计划为:
WholeStageCodegen *(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6]) InputAdapter +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13] WholeStageCodegen +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L]) +- *(1) Range (0, 10, step=1, splits=2)
注意:类似有*(1),*(2)这种符号的,表明是有全代码生成的,而为什么在物理计划的时候没有显示 WholeStageCodegenExec 和 InputAdapter 计划是因为该两个计划重写了generateTreeString方法:
WholeStageCodegenExec 重写为如下:
override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, indent: Int = 0): Unit = { child.generateTreeString( depth, lastChildren, append, verbose, if (printNodeId) "* " else s"*($codegenStageId) ", false, maxFields, printNodeId, indent) }
这里的*($codegenStageId) 就是上面所说的*(1),*(2),而这里的数字1和2代表者不同的两个代码生成阶段,因为Exchange不支持代码生成,所以被隔离成了两个代码生成。而*($codegenStageId) 作为子计划的前缀传递到了下游。
InputAdapter 重写为如下:
override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, indent: Int = 0): Unit = { child.generateTreeString( depth, lastChildren, append, verbose, prefix = "", addSuffix = false, maxFields, printNodeId, indent)
看到这里的prefix = “”,所以单纯从执行计划看是没有任何迹象能表明存在着InputAdapter计划.
所以说,我们最后应该看到的数据流应为:
第一阶段wholeStageCodegen:
WholeStageCodegenExec SortAggregateExec(Partial) RangeExec ========================================================================= -> execute() | doExecute() ---------> inputRDDs() -----------------> inputRDDs() | doCodeGen() | +-----------------> produce() | doProduce() | doProduceWithoutKeys() -------> produce() | doProduce() | doConsume()<------------------- consume() | doConsumeWithoutKeys() |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用 doConsume() <-------- consume()
第二阶段wholeStageCodegen:
WholeStageCodegenExec SortAggregateExec(Final) InputAdapter ShuffleExchangeExec ==================================================================================== -> execute() | doExecute() ---------> inputRDDs() -----------------> inputRDDs() -------> execute() | | doCodeGen() doExecute() | | +-----------------> produce() ShuffledRowRDD | doProduce() | doProduceWithoutKeys() -------> produce() | doProduce() | doConsume() <------------------- consume() | doConsumeWithoutKeys() |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用 doConsume() <-------- consume()