Spark中的WholeStageCodegenExec(全代码生成)

简介: Spark中的WholeStageCodegenExec(全代码生成)

背景


在之前的文章中Spark DPP(动态分区裁剪)导致的DataSourceScanExec NullPointerException问题分析以及解决,我们直接跳过了动态代码生成失败这版本一步部分,这次我们来分析一下,SQL还是在以上提到的文章中。


分析


运行完该sql,我们可以看到如下的物理计划:

image.png

image.png

我们看到FilterExec和ColumnarRoRowExec并没有在一个WholeStageCodegen 中, 这是为什么呢?

这是因为exists方法是继承自CodegenFallback Trait。

我们可以跟踪一下物理规则CollapseCodegenStages,对应的代码如下:

private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = {
    plan match {
      // For operators that will output domain object, do not insert WholeStageCodegen for it as
      // domain object can not be written into unsafe row.
      case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
        plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
      case plan: LocalTableScanExec =>
        // Do not make LogicalTableScanExec the root of WholeStageCodegen
        // to support the fast driver-local collect/take paths.
        plan
      case plan: CodegenSupport if supportCodegen(plan) =>
        // The whole-stage-codegen framework is row-based. If a plan supports columnar execution,
        // it can't support whole-stage-codegen at the same time.
        assert(!plan.supportsColumnar)
        WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
      case other =>
        other.withNewChildren(other.children.map(insertWholeStageCodegen))
    }

在FilterExec做supportCodegen判断的时候,还会扫描他的表达式是否存在CodegenFallback的子类,如果存在则不会把FilterExec做全代码代码生成。显然这里是不满足条件的, 而他的child,也就是ColumnarRoRowExec 他是符合的,所以ColumnarRoRowExec是可以用全代码代码生成的。

所以在driver端生成RDD的时候,FilterExec还是会走自身的doExecute方法,也就是先会运行createCodeGeneratedObject代码部分,最终还是会调用到subexpressionElimination这个方法,从而报错。


其实我们可以修改一下对应的sql,让它走全代码生成,去掉exists的过滤条件.

FROM test_b where scenes='gogo' and exists(array(date1),x-> x =='2021-03-04') -> FROM test_b where scenes='gogo'

这样我们可以得到如下的物理计划:

image.png

image.png


可以看到FilterExec和ColumnerExec是在一个WholeStageCodegen中,从代码级别来说,就是会生成类似如下的数据结构:

WholeStageCodegenExec(FilterExec(ColumnarToRowExec(InputAdapter(FileSourceScanExec))))

所在在driver端生成对应RDD的时候,就会走到WholeStageCodegenExec的doExecute方法,如下:

override def doExecute(): RDD[InternalRow] = {
    val (ctx, cleanedSource) = doCodeGen()
    // try to compile and fallback if it failed
    val (_, compiledCodeStats) = try {
      CodeGenerator.compile(cleanedSource)
    } catch {
      case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
        // We should already saw the error message
        logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString")
        return child.execute()
    }
    ...
     val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
    assert(rdds.size <= 2, "Up to two input RDDs can be supported")
    if (rdds.length == 1) {
      rdds.head.mapPartitionsWithIndex { (index, iter) =>
        val (clazz, _) = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(iter)
    ...

doCodeGen方法还是按照produce->doProduce->consume-doConsume>的调用方式来组织各个物理计划间的代码。

而最终还是会调用到FilterExec的doConsume代码,而这里面涉及到的DPP的表达式代码就是通过代码生成的:

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    val numOutput = metricTerm(ctx, "numOutputRows")
    /**
     * Generates code for `c`, using `in` for input attributes and `attrs` for nullability.
     */
    def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = {
      val bound = BindReferences.bindReference(c, attrs)
      val evaluated = evaluateRequiredVariables(child.output, in, c.references)
      // Generate the code for the predicate.
      val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
      val nullCheck = if (bound.nullable) {
        s"${ev.isNull} || "
      } else {
    ...

通过genCode方法调用expression的 doGenCode方法。

对应到我们sql对应的表达式为DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) .

而DynamicPruningExpression的还是调用child的genCode的方法,也就是最终会调用到InSubqueryExec的doGenCode方法:

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    prepareResult()
    inSet.doGenCode(ctx, ev)
  }
  ...
private def prepareResult(): Unit = {
    require(resultBroadcast != null, s"$this has not finished")
    if (result == null) {
      result = resultBroadcast.value
    }
  }

prepareResult方法是准备broadcast的值。

inSet 方法只是正常对一行row进行计算。

全代码生成完后,进行代码的编译,如下:

val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
    assert(rdds.size <= 2, "Up to two input RDDs can be supported")
    if (rdds.length == 1) {
      rdds.head.mapPartitionsWithIndex { (index, iter) =>
        val (clazz, _) = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(iter))

这里的rdds是会调用inputRDDs方法,而该方法是递归调用child的inputRDDs方法,最终会调用到ColumnarToRowExec的inputRDDs方法,

从而调用InputAdapter的executeColumnar方法,最终会调用到FileSourceScanExec的doExecuteColumnar方法,从而生成对应的RDD。

生成的代码会在executor端再次被编译,从而进行运算。


注意:resultBroadcast的值是在WholeStageCodegenExec的方法execute中完成的:

final def execute(): RDD[InternalRow] = executeQuery {
    if (isCanonicalizedPlan) {
      throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
    }
    doExecute()
  }

其中executeQuery方法如下:

protected final def executeQuery[T](query: => T): T = {
    RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
      prepare()
      waitForSubqueries()
      query
    }
  }
  ...
  final def prepare(): Unit = {
    // doPrepare() may depend on it's children, we should call prepare() on all the children first.
    children.foreach(_.prepare())
    synchronized {
      if (!prepared) {
        prepareSubqueries()
        doPrepare()
        prepared = true
      }
    }
  }

prepare和prepareSubqueries方法递归调用,从而使子节点都能把准备工作做好,如这里的driver端的广播,从而在executor端能够获取对应的广播变量。

至此,分析就完成了。

相关文章
|
SQL 分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
189 0
|
分布式计算 Spark
SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明
SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明
142 0
|
分布式计算 Java Spark
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(3)
252 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
143 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
48 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
104 0
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
94 6
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
115 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
86 1