SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(8)

简介: SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(8)

背景


本文基于 SPARK 3.3.0

从一个unit test来探究SPARK Codegen的逻辑,

  test("SortAggregate should be included in WholeStageCodegen") {
    val df = spark.range(10).agg(max(col("id")), avg(col("id")))
    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
      val plan = df.queryExecution.executedPlan
      assert(plan.exists(p =>
        p.isInstanceOf[WholeStageCodegenExec] &&
          p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
      assert(df.collect() === Array(Row(9, 4.5)))
    }
  }

该sql形成的执行计划第二部分的全代码生成部分如下:

WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
   InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]

分析


第二阶段wholeStageCodegen

第二阶段的代码生成涉及到SortAggregateExec和ShuffleExchangeExec以及InputAdapter的produce和consume方法,这里一一来分析:

第二阶段wholeStageCodegen数据流如下:


 WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
  ====================================================================================
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
      |                                                                            |
   doCodeGen()                                                                  doExecute()     
      |                                                                            |
      +----------------->   produce()                                           ShuffledRowRDD
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume() <------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()

SortAggregateExec(Final) 的doProduce

这里只列出和SortAggregateExec(Partial)的不同的部分:

    val (resultVars, genResult) = if (modes.contains(Final) || modes.contains(Complete)) {
      // evaluate aggregate results
      ctx.currentVars = flatBufVars
      val aggResults = bindReferences(
        functions.map(_.evaluateExpression),
        aggregateBufferAttributes).map(_.genCode(ctx))
      val evaluateAggResults = evaluateVariables(aggResults)
      // evaluate result expressions
      ctx.currentVars = aggResults
      val resultVars = bindReferences(resultExpressions, aggregateAttributes).map(_.genCode(ctx))
      (resultVars,
        s"""
           |$evaluateAggResults
           |${evaluateVariables(resultVars)}
         """.stripMargin)


因为我们这里是Final部分,所以我们的数据流和Partial是不同的

ctx.currentVars = flatBufVars

赋值currentVars为当前buffer变量,便于下面进行数据绑定,该buffer变量是全局变量

val aggResults = bindReferences

functions.map(_.evaluateExpression) 这是对最终输出结果的计算,对于SUM来说是Divide(sum.cast(resultType), count.cast(resultType), failOnError = false) ,生成的代码如下:

   boolean sortAgg_isNull_6 = sortAgg_bufIsNull_2;
   double sortAgg_value_6 = -1.0;
   if (!sortAgg_bufIsNull_2) {
     sortAgg_value_6 = (double) sortAgg_bufValue_2;
   }
   boolean sortAgg_isNull_4 = false;
   double sortAgg_value_4 = -1.0;
   if (sortAgg_isNull_6 || sortAgg_value_6 == 0) {
     sortAgg_isNull_4 = true;
   } else {
     if (sortAgg_bufIsNull_1) {
       sortAgg_isNull_4 = true;
     } else {
       sortAgg_value_4 = (double)(sortAgg_bufValue_1 / sortAgg_value_6);
     }
   }

aggregateBufferAttributes 聚合函数的buffer属性值 sum :: count :: Nil

这样在绑定数据的变量数据的时候和currentVars是一一对应的

val evaluateAggResults = evaluateVariables(aggResults)

对聚合的结果进行最终的计算

ctx.currentVars = aggResults

把最终结果的变量赋值给currentVars,便于后面的数据绑定

val resultVars = bindReferences(resultExpressions, aggregateAttributes).map(_.genCode(ctx))

这一步是把聚合结果的变量绑定到聚合表达式中,

其中resultExpressions为List( avg(id#0L)#3 AS avg(id)#6) (这里我们只考虑AVG)

aggregateAttributes是resultExpression的AttributeReference的一种表达,便于在BoundReference的时候进行映射绑定

对应的ExprCode为ExprCode(,sortAgg_isNull_4,sortAgg_value_4))


InputAdaptor的 doProduce


InputAdaptor的主要作用是承上启下,用来适配不支持Codegen的物理计划,sql如下:

  override def doProduce(ctx: CodegenContext): String = {
 // Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen
 val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
   forceInline = true)
 val row = ctx.freshName("row")
 val outputVars = if (createUnsafeProjection) {
   // creating the vars will make the parent consume add an unsafe projection.
   ctx.INPUT_ROW = row
   ctx.currentVars = null
   output.zipWithIndex.map { case (a, i) =>
     BoundReference(i, a.dataType, a.nullable).genCode(ctx)
   }
 } else {
   null
 }
 val updateNumOutputRowsMetrics = if (metrics.contains("numOutputRows")) {
   val numOutputRows = metricTerm(ctx, "numOutputRows")
   s"$numOutputRows.add(1);"
 } else {
   ""
 }
 s"""
    | while ($limitNotReachedCond $input.hasNext()) {
    |   InternalRow $row = (InternalRow) $input.next();
    |   ${updateNumOutputRowsMetrics}
    |   ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim}
    |   ${shouldStopCheckCode}
    | }
  """.stripMargin
}

val input = ctx.addMutableState(“scala.collection.Iterator”, “input”, v => s"$v = inputs[0];"

定义一个input变量用来接受sortaggregate(partial)的输出的InteralRow(unsafeRow),对应的初始化方法会在init方法中调用

val row = ctx.freshName(“row”)

定义一个临时变量用来接受input中的unsafe类型的InteralRow,便于进行迭代操作

val outputVars = if (createUnsafeProjection)

对于InputAdaptor来说createUnsafeProjection是 false, 所以这块返回的是null

val updateNumOutputRowsMetrics =

因为metrics不满足条件,所以这里也是返回空字符串

代码组装

    s"""
   | while ($limitNotReachedCond $input.hasNext()) {
   |   InternalRow $row = (InternalRow) $input.next();
   |   ${updateNumOutputRowsMetrics}
   |   ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim}
   |   ${shouldStopCheckCode}
   | }
 """.stripMargin

对输入的每一行数据进行迭代操作, 之后再调用consume方法,

注意: 这里的consume传入的是row,是InteralRow类型,而不是在RangeExec中的Long类型的变量


InputAdaptor的 consume


我们这里只说明和之前不一样的部分,对应的sql如下:

  final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String =

注意这里的参数 outputVarsnull

rowInteralRow类型的变量

  • val inputVarsCandidate =
 val inputVarsCandidate =
   if (outputVars != null) {
     assert(outputVars.length == output.length)
     // outputVars will be used to generate the code for UnsafeRow, so we should copy them
     outputVars.map(_.copy())
   } else {
     assert(row != null, "outputVars and row cannot both be null.")
     ctx.currentVars = null
     ctx.INPUT_ROW = row
     output.zipWithIndex.map { case (attr, i) =>
       BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
     }
  }

这里的数据流向了 else :

  • ctx.INPUT_ROW = row
    设置当前的INPUT_ROWrow
    BoundReferencedoGenCode方法也是走向了另一个分支:
   assert(ctx.INPUT_ROW != null, "INPUT_ROW and currentVars cannot both be null.")
   val javaType = JavaCode.javaType(dataType)
   val value = CodeGenerator.getValue(ctx.INPUT_ROW, dataType, ordinal.toString)
   if (nullable) {
     ev.copy(code =
       code"""
          |boolean ${ev.isNull} = ${ctx.INPUT_ROW}.isNullAt($ordinal);
          |$javaType ${ev.value} = ${ev.isNull} ?
          |  ${CodeGenerator.defaultValue(dataType)} : ($value);
        """.stripMargin)
   } else {
     ev.copy(code = code"$javaType ${ev.value} = $value;", isNull = FalseLiteral)
   }

分析

val value = CodeGenerator.getValue(ctx.INPUT_ROW, dataType,ordinal.toString)

根据数据类型的不同,调用UnsafeRow的不同方法


if (nullable)

因为AttributeReference("sum", sumDataType)()和AttributeReference("count", LongType)()表达式 nullable 为 TRUE,所以生成的代码为:

boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
long inputadapter_value_0 = inputadapter_isNull_0 ?
-1L : (inputadapter_row_0.getLong(0));
boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
double inputadapter_value_1 = inputadapter_isNull_1 ?
-1.0 : (inputadapter_row_0.getDouble(1));
boolean inputadapter_isNull_2 = inputadapter_row_0.isNullAt(2);
long inputadapter_value_2 = inputadapter_isNull_2 ?
-1L : (inputadapter_row_0.getLong(2));
  • constructDoConsumeFunction方法中inputVarsInFunc
    这里会多一个名为inputadapter_row_0的InternalRow类型的实参
相关文章
|
资源调度 流计算
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
295 1
|
前端开发 数据可视化 Java
程序员最喜欢用的 8 种代码对比工具,值得收入囊中
程序员最喜欢用的 8 种代码对比工具,值得收入囊中
1641 0
|
存储 计算机视觉 Python
搭建Python-OpenCV开发环境(包括Python、Pycharm、OpenCV下载 图文解释)
搭建Python-OpenCV开发环境(包括Python、Pycharm、OpenCV下载 图文解释)
1561 0
|
运维 网络虚拟化 5G
带你读《ONAP技术详解与应用实践》之一:网络自动化挑战及ONAP介绍
国内首部系统剖析ONAP的书籍,也是理论性与实战性兼具的网络自动化实践指导书!本书详细全面地介绍了网络自动化的挑战和发展趋势,以及ONAP的概况、架构设计理念、设计原则、各模块实现细节、关键特性、应用场景和案例实践等。通过本书读者可以深入理解ONAP,提升对网络自动化及相关领域的认知。作者及其团队成员均是华为网络开源领域的专家,长期参与社区的治理、贡献和回馈,致力于通过产业协作,打造统一的平台,降低集成成本,加快新技术导入,助力新一代网络运维系统升级。同时,本书也融入了作者及其团队在网络开源领域的深刻洞察和见解,书中分享了华为参与网络开源的实践经验,是电信网络转型的重要参考。
|
人工智能 自然语言处理 PyTorch
Text2Video Huggingface Pipeline 文生视频接口和文生视频论文API
文生视频是AI领域热点,很多文生视频的大模型都是基于 Huggingface的 diffusers的text to video的pipeline来开发。国内外也有非常多的优秀产品如Runway AI、Pika AI 、可灵King AI、通义千问、智谱的文生视频模型等等。为了方便调用,这篇博客也尝试了使用 PyPI的text2video的python库的Wrapper类进行调用,下面会给大家介绍一下Huggingface Text to Video Pipeline的调用方式以及使用通用的text2video的python库调用方式。
|
Docker 容器
paddleocr 在docker环境下部署_docker部署paddleocr,90%的人看完都说好
paddleocr 在docker环境下部署_docker部署paddleocr,90%的人看完都说好
|
运维 负载均衡 监控
同时设置两张网卡的接口跃点数:影响与优化分析
在现代网络中,服务器常配有多张网卡以提升性能和冗余。本文探讨了不同跃点数配置的影响及优化策略。首先介绍了跃点数的概念及其对数据传输效率的影响。接着分析了两张网卡跃点数差异可能导致的延迟增加、负载不均衡等问题,并提出了负载均衡、优先级设置、监控调整及故障转移等优化方法,帮助网络管理员实现高效稳定的通信。
|
前端开发 C# 开发者
WPF开发者必读:MVVM模式实战,轻松构建可维护的应用程序,让你的代码更上一层楼!
【8月更文挑战第31天】在WPF应用程序开发中,MVVM(Model-View-ViewModel)模式通过分离关注点,提高了代码的可维护性和可扩展性。本文详细介绍了MVVM模式的三个核心组件:Model(数据模型)、View(用户界面)和ViewModel(处理数据绑定与逻辑),并通过示例代码展示了如何在WPF项目中实现MVVM模式。通过这种模式,开发者可以更高效地构建桌面应用程序。希望本文能帮助你在WPF开发中更好地应用MVVM模式。
864 1
|
安全 Linux 数据安全/隐私保护
驾驭Linux的权力:Root与Sudo
在 Linux 系统中,权限管理至关重要,Root 用户与 Sudo 命令为核心组件。Root 作为超级用户,拥有最高权限,可执行任意命令,但也带来较高安全风险,建议仅在必要时使用。Sudo 则允许系统管理员授予普通用户临时的 Root 权限以执行特定命令,提升了系统的安全性和管理灵活性。通过合理配置 Sudoers 文件,可以实现对用户权限的精细化管理。综合运用 Root 和 Sudo 可确保系统的安全稳定运行。
544 1
|
机器学习/深度学习 人工智能 自然语言处理

热门文章

最新文章