Spark expression Codegen 之code代码块

简介: Spark expression Codegen 之code代码块

背景


本文基于spark 3.2.0

由于codegen涉及到的知识点比较多,我们先来说清楚code"""""",我们暂且叫做code代码块


scala 字符串插值


要想搞清楚spark的code代码块,就得现搞清楚scala 字符串插值。

scala 字符串插值是2.10.0版本引用进来的新语法规则,可以直接允许使用者将变量引用直接插入到字符串中,如下:

val name = 'LI'
println(s"My name is $name")
输出:
My name is LI

这种资料很多,大家自行查阅资料理解。

code代码块

因为这块代码比较复杂,直接拿出例子来运行:

直接找到spark CastSuite.scala 第215行如下:

test("cast string to boolean II") {
    checkEvaluation(cast("abc", BooleanType), null)

之后在javaCode.scala 输出对应的想要debug的值,如下:

 */
    def code(args: Any*): Block = {
      sc.checkLengths(args)
      if (sc.parts.length == 0) {
        EmptyBlock
      } else {
        args.foreach {
          case _: ExprValue | _: Inline | _: Block =>
          case _: Boolean | _: Byte | _: Int | _: Long | _: Float | _: Double | _: String =>
          case other => throw QueryExecutionErrors.cannotInterpolateClassIntoCodeBlockError(other)
        }
        val (codeParts, blockInputs) = foldLiteralArgs(sc.parts, args)
        // scalasytle:off
        println(s"code: $codeParts")
        println(s"blockInputs: $blockInputs")
        // scalasytle:on
        CodeBlock(codeParts, blockInputs)
      }
    }

这样,运行后我们会发现,如下结果:

code: ArrayBuffer(
          if (org.apache.spark.sql.catalyst.util.StringUtils.isTrueString(, )) {
            ,  = true;
          } else if (org.apache.spark.sql.catalyst.util.StringUtils.isFalseString(, )) {
            ,  = false;
          } else {
            isNull_0 = true;
          }
        )
blockInputs: ArrayBuffer(((UTF8String) references[0] /* literal */), value_0, ((UTF8String) references[0] /* literal */), value_0)
result: if (org.apache.spark.sql.catalyst.util.StringUtils.isTrueString(((UTF8String) references[0] /* literal */))) {
            value_0 = true;
          } else if (org.apache.spark.sql.catalyst.util.StringUtils.isFalseString(((UTF8String) references[0] /* literal */))) {
            value_0 = false;
          } else {
            isNull_0 = true;
          }
...

而这段代码刚好和Cast.scala中的 castToBooleanCode方法是一一对应的的:

private[this] def castToBooleanCode(from: DataType): CastFunction = from match {
    case StringType =>
      val stringUtils = inline"${StringUtils.getClass.getName.stripSuffix("$")}"
      (c, evPrim, evNull) =>
        val castFailureCode = if (ansiEnabled) {
          s"throw QueryExecutionErrors.invalidInputSyntaxForBooleanError($c);"
        } else {
          s"$evNull = true;"
        }
        val result = code"""
          if ($stringUtils.isTrueString($c)) {
            $evPrim = true;
          } else if ($stringUtils.isFalseString($c)) {
            $evPrim = false;
          } else {
            $castFailureCode
          }
        """
        // scalastyle:off
        println(s"result: $result")
        // scalastyle:on
      result

也就是说spark自定义的ExprValue类型的值被替换了(其实是Inline/Block/ExprValue这三种类型的值都会被替换,只不过这里没有体现),如下:

image.png

而输出的result结果就是拼接完后的完整字符串。

我们这里是为了debug,才会把结果和对应的片段打印出来,

而在spark真正处理的时候,返回的是ExprCode类型的值,在真正需要代码生成的时候,才会调用的toString的方法生成对应的字符串


code代码块之间的连接


但是我们在Cast.scala的方法中我们看到的doGenCode是先调用child.genCode的方法的:

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    val eval = child.genCode(ctx)
    val nullSafeCast = nullSafeCastFunction(child.dataType, dataType, ctx)
    ev.copy(code = eval.code +
      castCode(ctx, eval.value, eval.isNull, ev.value, ev.isNull, dataType, nullSafeCast))
  }

那子节点的ExprCode怎么和父节点的ExprCode连接起来的呢?

其实这个和写代码的思路是一样的,每个子节点返回的ExprCode类型的值,都会对应为该方法体的的实现代码,返回值(包括了类型),spark额外增加了一个是否为null,如下:

case class ExprCode(var code: Block, var isNull: ExprValue, var value: ExprValue)

其中code是对应的方法体的实现代码,

isNull 是对应的是否为null,

value 代表的返回值

至于为什么会额外增加一个是否为null,还是和写代码的逻辑是一样的,因为只有不为空的情况下,代码才会正常的往下运行:

protected[this] def castCode(ctx: CodegenContext, input: ExprValue, inputIsNull: ExprValue,
    result: ExprValue, resultIsNull: ExprValue, resultType: DataType, cast: CastFunction): Block = {
    val javaType = JavaCode.javaType(resultType)
    code"""
      boolean $resultIsNull = $inputIsNull;
      $javaType $result = ${CodeGenerator.defaultValue(resultType)};
      if (!$inputIsNull) {
        ${cast(input, result, resultIsNull)}
      }
    """
  }

这里的!$inputIsNull判断,只有不为空了才进行下一步的转换操作,要不然会抛出异常。


这样把子节点的结果作为父节点的入参传入给对应的方法,这样生成的代码完全符合编码的逻辑,这样这部分也就说完了,当然这部分也是代码生成的重中之重,理解了这部分,代码生成这块就差不多了,其他的就是各个部分的实现,用心去看即可。


相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
124 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
70 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
44 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
100 0
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
77 6
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
104 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
72 1
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
64 1
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
55 1
|
2月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
108 0