SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)

简介: SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)

背景


本文基于 SPARK 3.3.0

从一个unit test来探究SPARK Codegen的逻辑,

  test("SortAggregate should be included in WholeStageCodegen") {
    val df = spark.range(10).agg(max(col("id")), avg(col("id")))
    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
      val plan = df.queryExecution.executedPlan
      assert(plan.exists(p =>
        p.isInstanceOf[WholeStageCodegenExec] &&
          p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
      assert(df.collect() === Array(Row(9, 4.5)))
    }
  }

该sql形成的执行计划第二部分的全代码生成部分如下:

WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
   InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]


分析


第二阶段wholeStageCodegen

第二阶段的代码生成涉及到SortAggregateExec和ShuffleExchangeExec以及InputAdapter的produce和consume方法,这里一一来分析:

第二阶段wholeStageCodegen数据流如下:

 WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
  ====================================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
      |                                                                            |
   doCodeGen()                                                                  doExecute()     
      |                                                                            |
      +----------------->   produce()                                           ShuffledRowRDD
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume() <------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()


SortAggregateExec(Final) 的inputRDDs()


val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()

调用的是子类的inputRDDS,也就是SortAggregateExec的inputRDDS方法,最终调用到InputAdaptor的inputRDD方法:

  override def inputRDD: RDD[InternalRow] = child.execute()

,也就是调用的是ShuffleExchangeExecexecute方法

protected override def doExecute(): RDD[InternalRow] = {


// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
  cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)
}
cachedShuffleRDD
 ```
 这样整个链路就串联起来了。


相关文章
|
分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明
SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明
101 0
|
分布式计算 Java Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
175 0
|
SQL 分布式计算 Spark
Spark中的WholeStageCodegenExec(全代码生成)
Spark中的WholeStageCodegenExec(全代码生成)
430 0
Spark中的WholeStageCodegenExec(全代码生成)
|
4月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
161 0
|
19天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
4月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
7天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
3月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
129 0
|
4月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理