背景
本文基于 SPARK 3.3.0
从一个unit test来探究SPARK Codegen的逻辑,
test("SortAggregate should be included in WholeStageCodegen") { val df = spark.range(10).agg(max(col("id")), avg(col("id"))) withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") { val plan = df.queryExecution.executedPlan assert(plan.exists(p => p.isInstanceOf[WholeStageCodegenExec] && p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec])) assert(df.collect() === Array(Row(9, 4.5))) } }
该sql形成的执行计划第二部分的全代码生成部分如下:
WholeStageCodegen *(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6]) InputAdapter +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
分析
第二阶段wholeStageCodegen
第二阶段的代码生成涉及到SortAggregateExec和ShuffleExchangeExec以及InputAdapter的produce和consume方法,这里一一来分析:
第二阶段wholeStageCodegen数据流如下:
WholeStageCodegenExec SortAggregateExec(Final) InputAdapter ShuffleExchangeExec ==================================================================================== -> execute() | doExecute() ---------> inputRDDs() -----------------> inputRDDs() -------> execute() | | doCodeGen() doExecute() | | +-----------------> produce() ShuffledRowRDD | doProduce() | doProduceWithoutKeys() -------> produce() | doProduce() | doConsume() <------------------- consume() | doConsumeWithoutKeys() |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用 doConsume() <-------- consume()
SortAggregateExec(Final) 的inputRDDs()
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
调用的是子类的inputRDDS,也就是SortAggregateExec的inputRDDS方法,最终调用到InputAdaptor的inputRDD方法:
override def inputRDD: RDD[InternalRow] = child.execute()
,也就是调用的是ShuffleExchangeExec
的execute
方法:
protected override def doExecute(): RDD[InternalRow] = {
// Returns the same ShuffleRowRDD if this plan is used by multiple plans. if (cachedShuffleRDD == null) { cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics) } cachedShuffleRDD ``` 这样整个链路就串联起来了。