SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)

简介: SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)

背景


本文基于 SPARK 3.3.0

从一个unit test来探究SPARK Codegen的逻辑,

  test("SortAggregate should be included in WholeStageCodegen") {
    val df = spark.range(10).agg(max(col("id")), avg(col("id")))
    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
      val plan = df.queryExecution.executedPlan
      assert(plan.exists(p =>
        p.isInstanceOf[WholeStageCodegenExec] &&
          p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
      assert(df.collect() === Array(Row(9, 4.5)))
    }
  }

该sql形成的执行计划第二部分的全代码生成部分如下:

WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
   InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]


分析


第二阶段wholeStageCodegen

第二阶段的代码生成涉及到SortAggregateExec和ShuffleExchangeExec以及InputAdapter的produce和consume方法,这里一一来分析:

第二阶段wholeStageCodegen数据流如下:

 WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
  ====================================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
      |                                                                            |
   doCodeGen()                                                                  doExecute()     
      |                                                                            |
      +----------------->   produce()                                           ShuffledRowRDD
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume() <------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()


SortAggregateExec(Final) 的inputRDDs()


val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()

调用的是子类的inputRDDS,也就是SortAggregateExec的inputRDDS方法,最终调用到InputAdaptor的inputRDD方法:

  override def inputRDD: RDD[InternalRow] = child.execute()

,也就是调用的是ShuffleExchangeExecexecute方法

protected override def doExecute(): RDD[InternalRow] = {


// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
  cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)
}
cachedShuffleRDD
 ```
 这样整个链路就串联起来了。


相关文章
|
分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明
SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明
113 0
|
分布式计算 Java Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
218 0
|
SQL 分布式计算 Spark
Spark中的WholeStageCodegenExec(全代码生成)
Spark中的WholeStageCodegenExec(全代码生成)
491 0
Spark中的WholeStageCodegenExec(全代码生成)
|
1月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
83 1
Spark快速大数据分析PDF下载读书分享推荐
|
13天前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
56 3
|
13天前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
37 2
|
2月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
152 59
|
21天前
|
分布式计算 Hadoop 大数据
Hadoop与Spark在大数据处理中的对比
【7月更文挑战第30天】Hadoop和Spark在大数据处理中各有优势,选择哪个框架取决于具体的应用场景和需求。Hadoop适合处理大规模数据的离线分析,而Spark则更适合需要快速响应和迭代计算的应用场景。在实际应用中,可以根据数据处理的需求、系统的可扩展性、成本效益等因素综合考虑,选择适合的框架进行大数据处理。
|
1月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
76 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享