SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明

简介: SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明

背景


对于在在RangeExec中出现的GenerateUnsafeProjection.createCode的方法进行说明


分析


对应的代码为:

  def createCode(
      ctx: CodegenContext,
      expressions: Seq[Expression],
      useSubexprElimination: Boolean = false): ExprCode = {
    val exprEvals = ctx.generateExpressions(expressions, useSubexprElimination)
    val exprSchemas = expressions.map(e => Schema(e.dataType, e.nullable))
    val numVarLenFields = exprSchemas.count {
      case Schema(dt, _) => !UnsafeRow.isFixedLength(dt)
      // TODO: consider large decimal and interval type
    }
    val rowWriterClass = classOf[UnsafeRowWriter].getName
    val rowWriter = ctx.addMutableState(rowWriterClass, "rowWriter",
      v => s"$v = new $rowWriterClass(${expressions.length}, ${numVarLenFields * 32});")
    // Evaluate all the subexpression.
    val evalSubexpr = ctx.subexprFunctionsCode
    val writeExpressions = writeExpressionsToBuffer(
      ctx, ctx.INPUT_ROW, exprEvals, exprSchemas, rowWriter, isTopLevel = true)
//   println(s"writeExpressions: $writeExpressions")
    val code =
      code"""
         |$rowWriter.reset();
         |$evalSubexpr
         |$writeExpressions
       """.stripMargin
    // `rowWriter` is declared as a class field, so we can access it directly in methods.
//    println(s"code: $code")
    ExprCode(code, FalseLiteral, JavaCode.expression(s"$rowWriter.getRow()", classOf[UnsafeRow]))
  }

其中 expressions的值为Seq(BoundReference(0, long, false))

useSubexpreElimination为false


val exprEvals = ctx.generateExpressions(expressions, useSubexprElimination)

这里只是代码生成,exprEvals的值就是range_value_0

因为useSubexprElimination 是false,所以不会进行公共代码的消除

val exprSchemas = expressions.map(e => Schema(e.dataType, e.nullable))

得到对应的表达式的schema

val numVarLenFields =

计算出非固定长度字段的个数,用于初始化UnsafeRowWriter

val rowWriter =

定义并初始化rowWriter,该rowWriter是全局范围的,生成的代码如下:

 private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
 public void init(int index, scala.collection.Iterator[] inputs) {
 ...
 range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
 }

val evalSubexpr = ctx.subexprFunctionsCode

这里为空字符串

val writeExpressions = writeExpressionsToBuffer

private def writeExpressionsToBuffer(
    ctx: CodegenContext,
    row: String,
    inputs: Seq[ExprCode],
    schemas: Seq[Schema],
    rowWriter: String,
    isTopLevel: Boolean = false): String = {
  val resetWriter = if (isTopLevel) {
    // For top level row writer, it always writes to the beginning of the global buffer holder,
    // which means its fixed-size region always in the same position, so we don't need to call
    // `reset` to set up its fixed-size region every time.
    if (inputs.map(_.isNull).forall(_ == FalseLiteral)) {
      // If all fields are not nullable, which means the null bits never changes, then we don't
      // need to clear it out every time.
      ""
    } else {
      s"$rowWriter.zeroOutNullBytes();"
    }
  } else {
    s"$rowWriter.resetRowWriter();"
  }
  val writeFields = inputs.zip(schemas).zipWithIndex.map {
    case ((input, Schema(dataType, nullable)), index) =>
      val dt = UserDefinedType.sqlType(dataType)
      val setNull = dt match {
        case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
          // Can't call setNullAt() for DecimalType with precision larger than 18.
          s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});"
        case CalendarIntervalType => s"$rowWriter.write($index, (CalendarInterval) null);"
        case _ => s"$rowWriter.setNullAt($index);"
      }
      val writeField = writeElement(ctx, input.value, index.toString, dt, rowWriter)
      if (!nullable) {
        s"""
           |${input.code}
           |${writeField.trim}
         """.stripMargin
      } else {
        s"""
           |${input.code}
           |if (${input.isNull}) {
           |  ${setNull.trim}
           |} else {
           |  ${writeField.trim}
           |}
         """.stripMargin
      }
  }
  val writeFieldsCode = if (isTopLevel && (row == null || ctx.currentVars != null)) {
    // TODO: support whole stage codegen
    writeFields.mkString("\n")
  } else {
    assert(row != null, "the input row name cannot be null when generating code to write it.")
    ctx.splitExpressions(
      expressions = writeFields,
      funcName = "writeFields",
      arguments = Seq("InternalRow" -> row))
  }
  s"""
     |$resetWriter
     |$writeFieldsCode
   """.stripMargin
}

val resetWriter =

因为inputs为null为false,所以resetWriter的值为空字符串


val writeFields =

因为inputs的类型是LONG类型,所以对应到val writeField = writeElement(ctx, input.value, index.toString, dt, rowWriter)代码为:

case _ => s"$writer.write($index, $input);",所以生成的代码为:

 range_mutableStateArray_0[0].write(0, range_value_0)
  • val writeFieldsCode =以及后面的代码组装
    对每一个变量的赋值按照换行符进行分隔。
  • val code =
    组装成ExprCode的code部分,生成的代码如下:
range_mutableStateArray_0[0].reset();
range_mutableStateArray_0[0].write(0, range_value_0);

最后ExprCode的完整部分如下:

 ExprCode(range_mutableStateArray_0[0].reset();
range_mutableStateArray_0[0].write(0, range_value_0);,false,(range_mutableStateArray_0[0].getRow()))


相关文章
|
SQL 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
164 0
|
分布式计算 Java Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
218 0
|
SQL 分布式计算 Spark
Spark中的WholeStageCodegenExec(全代码生成)
Spark中的WholeStageCodegenExec(全代码生成)
491 0
Spark中的WholeStageCodegenExec(全代码生成)
|
1月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
83 1
Spark快速大数据分析PDF下载读书分享推荐
|
12天前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
56 3
|
12天前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
36 2
|
2月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
152 59
|
21天前
|
分布式计算 Hadoop 大数据
Hadoop与Spark在大数据处理中的对比
【7月更文挑战第30天】Hadoop和Spark在大数据处理中各有优势,选择哪个框架取决于具体的应用场景和需求。Hadoop适合处理大规模数据的离线分析,而Spark则更适合需要快速响应和迭代计算的应用场景。在实际应用中,可以根据数据处理的需求、系统的可扩展性、成本效益等因素综合考虑,选择适合的框架进行大数据处理。
|
1月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
76 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
2月前
|
分布式计算 资源调度 Hadoop
Java大数据处理:Spark与Hadoop整合
Java大数据处理:Spark与Hadoop整合