Spark DPP(动态分区裁剪)导致的DataSourceScanExec NullPointerException问题分析以及解决

简介: Spark DPP(动态分区裁剪)导致的DataSourceScanExec NullPointerException问题分析以及解决

背景


本文基于spark 3.1.2,且运行在yarn模式下

最近在调试 spark sql的时候遇到了空指针的问题,如下:

 Caused by: java.lang.NullPointerException
  at org.apache.spark.sql.execution.DataSourceScanExec.$init$(DataSourceScanExec.scala:57)
  at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:172)
  at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:635)
  at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:162)
    ...
    at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.proxyExpressions(SubExprEvaluationRuntime.scala:89)
  at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.<init>(predicates.scala:53)
  at org.apache.spark.sql.catalyst.expressions.Predicate$.createInterpretedObject(predicates.scala:92)
  at org.apache.spark.sql.catalyst.expressions.Predicate$.createInterpretedObject(predicates.scala:85)
  at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:56)
  at org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:101)
  at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2(basicPhysicalOperators.scala:246)
  at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2$adapted(basicPhysicalOperators.scala:245)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    ...
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)

分析

遇到以上问题,一脸懵?

我们分析一下,这里会涉及到spark的DPP(动态分区裁剪) 以及codegen部分。我们提取最小化的报错sql如下(脱敏处理):

create table test_a_pt(col1 int, col2 int,pt string) USING parquet PARTITIONED BY (pt);
insert into table test_a_pt values(1,2,'20220101'),(3,4,'20220101'),(1,2,'20220101'),(3,4,'20220101'),(1,2,'20220101'),(3,4,'20220101');
drop table if exists test_b;
create table test_b as select 1 as `搜索demo_uv` ,2 as `搜索demo_gmv`, 'gogo' as scenes, '2021-03-04' as date1;
drop table if exists gg_gg;
create table gg_gg as 
SELECT  a.pt,
        a.scenes
FROM    (
            SELECT   '20220101' as pt 
                     ,'comeon' AS scenes
            FROM    test_b where scenes='gogo' and exists(array(date1),x-> x =='2021-03-04')
            UNION ALL
            SELECT  pt
                     ,'comeon'
            FROM    (
                        SELECT  pt,COUNT( distinct col2) AS buy_tab_uv
                        FROM    test_a_pt
                        where pt='20220101'
                        GROUP BY pt 
                    ) 
        ) a
JOIN    (
            SELECT  pt ,COUNT(distinct col2) AS buy_tab_uv
                    FROM  test_a_pt
                    where pt='20220101'
                    GROUP BY pt 
        ) b
ON      a.pt = b.pt
;
其中exists方法是继承CodegenFallback的

运行完后,我们可以拿到对应的物理计划,

image.png

image.png

image.png

这个sql会生成对应的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))(这个对下面的分析很重要)表达式,至于为什么会生成对应的表达式,可以看对应的逻辑计划和物理计划的生成日志,这里就不再说明。

我们直接拿堆栈信息进行分析,首先是

Task.run 这说明是在Executor端报错的

再者是FilterExec,对应的代码如下:

protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
      val predicate = Predicate.create(condition, child.output)
      predicate.initialize(0)
      iter.filter { row =>
        val r = predicate.eval(row)
        if (r) numOutputRows += 1
        r
      }
    }
  }

这里的Predicate.create方法会根据表达式来生成对应的BasePredicate类,这个类是对输入的每一行进行布尔计算的。

按照正常的流程的话,其实是会先调用在createCodeGeneratedObject,如果报错,就会再调用createInterpretedObject方法的。

细心的同学会发现,我们的报错stack中就只有Predicate$.createInterpretedObject,那第一部的代码生成呢?去哪了?

其实我们知道表达式的代码生成是在executor端的,所以我们可以找到对应executor端的代码,可以找到对应的信息:

22/02/26 19:36:19 WARN Predicate: Expr codegen error and falling back to interpreter mode
java.lang.NullPointerException
  at org.apache.spark.sql.execution.DataSourceScanExec.$init$(DataSourceScanExec.scala:57)
  at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:172)
  at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:635)
  at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:162)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:387)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

WARN Predicate: Expr codegen error and falling back to interpreter mode 这句就说明了该sql生成codegen的代码报错了,至于为什么生成代码失败了,下一次分析。

我们暂且就按照报错的堆栈进行分析,直接转到InterpretedPredicate类:

case class InterpretedPredicate(expression: Expression) extends BasePredicate {
  private[this] val subExprEliminationEnabled = SQLConf.get.subexpressionEliminationEnabled
  private[this] lazy val runtime =
    new SubExprEvaluationRuntime(SQLConf.get.subexpressionEliminationCacheMaxEntries)
  private[this] val expr = if (subExprEliminationEnabled) {
    runtime.proxyExpressions(Seq(expression)).head
  } else {
    expression
  }

这里有个subExprEliminationEnabled选项,这个选项是来消除公共表达式的,可以节省计算的时间,默认是开启的,注意这里很关键。

因为该选项开启了,所以我们接着往下走:

proxyExpressions的expressions.foreach(equivalentExpressions.addExprTree(_))方法:

def addExprTree(
      expr: Expression,
      addFunc: Expression => Boolean = addExpr): Unit = {
    val skip = expr.isInstanceOf[LeafExpression] ||
      // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the
      // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning.
      expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
      // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor,
      // can cause error like NPE.
      (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null)
    if (!skip && !addFunc(expr)) {
      childrenToRecurse(expr).foreach(addExprTree(_, addFunc))
      commonChildrenToRecurse(expr).filter(_.nonEmpty).foreach(addCommonExprs(_, addFunc))
    }
  }

这里有个skip判断,其中有一条是expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null,这个显然是针对executor端的,因为在executor端会生成TaskContext实例。

还记得我们说的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))表达式吗(broadcastValues 会包含FileSourceScanExec类)?在这里就很关键了,

但是从报错的堆栈信息里,明显可以知道skip是false,所以才会ReusedExchangeExec.doCanonicalize等信息,才会报NPE问题。

分析一下这里expr.isInstanceOf[PlanExpression[_]] 很显然这个判断是不能判断DynamicPruningExpression的,因为InSubqueryExec才是PlanExpression子类,才满足这个条件,

所以我们改成expr.isInstanceOf[PlanExpression[_]] -> expr.find(_.isInstanceOf[PlanExpression[_]]).isDefined

这样我们就能解决该问题。


我们接着往下走(中间的调用看堆栈信息即可):

DataSourceScanExec的57行,

trait DataSourceScanExec extends LeafExecNode {
  def relation: BaseRelation
  def tableIdentifier: Option[TableIdentifier]
  protected val nodeNamePrefix: String = ""
  override val nodeName: String = {
    s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
  }
  // Metadata that describes more details of this scan.
  protected def metadata: Map[String, String]
  protected val maxMetadataValueLength = sqlContext.sessionState.conf.maxMetadataStringLength
  ...

也就是protected val maxMetadataValueLength = sqlContext.sessionState.conf.maxMetadataStringLength这行会报错。

其中sqlContext在SparkPlan类中 @transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull

SparkSession.getActiveSession方法如下:

def getActiveSession: Option[SparkSession] = {
    if (TaskContext.get != null) {
      // Return None when running on executors.
      None
    } else {
      Option(activeThreadSession.get)
    }
  }

也就是说在executor端getActiveSession返回的是None,从而引发了NPE。

这样整个代码就完全捋顺了。


解决


关闭DPP(动态代码生成)

set spark.sql.optimizer.dynamicPartitionPruning.enabled=false

关闭公共表达式消除

set spark.sql.subexpressionElimination.enabled=false

修改代码,遇到DPP,直接跳过,并反馈给社区

这里有对应的jira

BTW 本文只是针对spark 3.1.2,对于3.2以及以上的版本代码进行了重构,具体的一些社区讨论可以参考SPARK-23731,SPARK-35742,SPARK-35798


其实解决方法1.2.3都是逐级的把影响范围缩小,至于怎么选择,看自己的选择。。。


相关文章
|
5月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
186 1
Spark快速大数据分析PDF下载读书分享推荐
|
7月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
143 0
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
164 2
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
88 0
|
2月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
57 0
|
2月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
60 0
|
4月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
|
5月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23723 42
|
4月前
|
分布式计算 并行计算 数据处理
|
7月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56606 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用