SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(1)

简介: SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(1)

背景


本文基于 SPARK 3.3.0

从一个unit test来探究SPARK Codegen的逻辑,


  test("SortAggregate should be included in WholeStageCodegen") {
    val df = spark.range(10).agg(max(col("id")), avg(col("id")))
    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
      val plan = df.queryExecution.executedPlan
      assert(plan.exists(p =>
        p.isInstanceOf[WholeStageCodegenExec] &&
          p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
      assert(df.collect() === Array(Row(9, 4.5)))
    }
  }

分析


执行计划的真实面目

Spark的全代码流程,网上和代码中都有提及,如下:


  WholeStageCodegen       Plan A               FakeInput        Plan B
  =========================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -------> inputRDDs() ------> execute()
      |
      +----------------->   produce()
                              |
                           doProduce()  -------> produce()
                                                    |
                                                 doProduce()
                                                    |
                          doConsume() <--------- consume()
                              |
   doConsume()  <--------  consume()

整体逻辑都知道,道理都懂,但是里面涉及到了好多细节,就拿以上的例子来说,会生成如下的执行计划:

*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
   +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
      +- *(1) Range (0, 10, step=1, splits=2)

但是实际在缕代码的时候,你就会发现这个全代码的逻辑根本就缕不通,那是因为CollapseCodegenStages规则会加WholeStageCodegenExec和InputAdapter物理计划,加了以后的计划为:

WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
  InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
    WholeStageCodegen
   +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
      +- *(1) Range (0, 10, step=1, splits=2)

注意:类似有*(1),*(2)这种符号的,表明是有全代码生成的,而为什么在物理计划的时候没有显示 WholeStageCodegenExec 和 InputAdapter 计划是因为该两个计划重写了generateTreeString方法:

WholeStageCodegenExec 重写为如下:

override def generateTreeString(
      depth: Int,
      lastChildren: Seq[Boolean],
      append: String => Unit,
      verbose: Boolean,
      prefix: String = "",
      addSuffix: Boolean = false,
      maxFields: Int,
      printNodeId: Boolean,
      indent: Int = 0): Unit = {
    child.generateTreeString(
      depth,
      lastChildren,
      append,
      verbose,
      if (printNodeId) "* " else s"*($codegenStageId) ",
      false,
      maxFields,
      printNodeId,
      indent)
  }

这里的*($codegenStageId) 就是上面所说的*(1),*(2),而这里的数字1和2代表者不同的两个代码生成阶段,因为Exchange不支持代码生成,所以被隔离成了两个代码生成。而*($codegenStageId) 作为子计划的前缀传递到了下游。

InputAdapter 重写为如下:


 override def generateTreeString(
      depth: Int,
      lastChildren: Seq[Boolean],
      append: String => Unit,
      verbose: Boolean,
      prefix: String = "",
      addSuffix: Boolean = false,
      maxFields: Int,
      printNodeId: Boolean,
      indent: Int = 0): Unit = {
    child.generateTreeString(
      depth,
      lastChildren,
      append,
      verbose,
      prefix = "",
      addSuffix = false,
      maxFields,
      printNodeId,
      indent)

看到这里的prefix = “”,所以单纯从执行计划看是没有任何迹象能表明存在着InputAdapter计划.

所以说,我们最后应该看到的数据流应为:

第一阶段wholeStageCodegen:

 WholeStageCodegenExec      SortAggregateExec(Partial)     RangeExec        
  =========================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() 
      |
   doCodeGen()
      |
      +----------------->   produce()
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume()<------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()

第二阶段wholeStageCodegen:

 WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
  ====================================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
      |                                                                            |
   doCodeGen()                                                                  doExecute()     
      |                                                                            |
      +----------------->   produce()                                           ShuffledRowRDD
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume() <------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()


相关文章
|
Kubernetes 负载均衡 安全
【K8S系列】深入解析k8s网络插件—Cilium
【K8S系列】深入解析k8s网络插件—Cilium
2710 1
|
存储 Unix 编译器
汇编语言----X86汇编指令
汇编语言----X86汇编指令
1212 2
|
存储 安全 Java
高并发神器!ConcurrentHashMap为何如此高效?
本文介绍了Java中的线程安全集合类`ConcurrentHashMap`,详细对比了其与`Hashtable`的性能差异,并解析了JDK 1.7和1.8中`ConcurrentHashMap`的底层实现。通过分段锁和CAS机制,`ConcurrentHashMap`在多线程环境下表现出色,是高并发场景下的理想选择。
247 1
|
缓存 自然语言处理 JavaScript
Web服务器的动态内容生成与处理
【8月更文第28天】在Web开发领域,动态内容生成是指根据用户请求实时生成页面内容的过程。这与静态内容生成不同,后者的内容在部署时就已经确定,不会随用户的请求而改变。动态内容生成通常依赖于服务器端脚本语言,例如PHP、Node.js等,它们能够根据不同的请求参数生成特定的响应数据。本文将探讨几种流行的服务器端脚本语言在动态网页生成中的作用及其优化方法,并提供相应的代码示例。
499 0
|
存储 运维 监控
怎样配置Linux分析工具:kdump篇
在运维的世界里,服务器的稳定运行是生命的灯塔,一旦遭遇异常重启,便是暴风雨来临的预兆。作为一名运维工程师,深知在这场与故障斗争的战役中,武器的锋利至关重要。今天,我要介绍的主角/工具——kdump,正是这样一款能在风雨来临之际,为我们捕获那一闪而过的真相的工具。
怎样配置Linux分析工具:kdump篇
|
自然语言处理 Java
ElasticSearch 实现分词全文检索 - 高亮查询
ElasticSearch 实现分词全文检索 - 高亮查询
215 0
|
存储 弹性计算 安全
云计算服务选型与成本分析
【7月更文挑战第2天】云计算服务选型与成本分析聚焦企业如何在IaaS、PaaS、SaaS间抉择,考虑云提供商的技术实力、服务范围、成本效益和支持。成本分析涉及硬件、软件和服务成本,通过简单回收期、投资回报率和净现值法评估效益。优化资源配置、弹性伸缩和合理计费是成本控制关键,助力企业高效利用云计算。
|
Cloud Native Java 关系型数据库
【阿里云云原生专栏】构建云原生应用:基于Spring Boot与阿里云服务的全栈指南
【5月更文挑战第21天】构建云原生应用是企业数字化转型的关键,本文提供了一份基于Spring Boot和阿里云的全栈指南。涵盖从阿里云账号注册、ECS与Docker搭建,到Spring Boot项目创建、业务代码编写和部署。此外,还介绍了如何集成阿里云OSS存储、RDS数据库服务以及ACK容器服务,助力打造高效、可扩展和易管理的云原生应用。
1176 3
|
SQL HIVE
【Hive SQL 每日一题】统计最近7天内连续下单3日的用户量
创建了一个名为`sales`的测试表,包含`user_id`、`product_id`、`quantity`和`sale_date`字段,插入了多条销售数据。需求是找出最近7天内连续下单3天的用户数量。SQL查询通过分组和窗口函数`row_number()`检查日期连续性,最终计算满足条件的唯一用户数。示例结果显示有3名用户符合条件。
529 0
|
人工智能 安全 机器人
新手必看!ChatGPT常见问题总整理,你遇到了几个?
新手必看!ChatGPT常见问题总整理,你遇到了几个?