背景
本文基于spark 3.1.2,且运行在yarn模式下
最近在调试 spark sql的时候遇到了空指针的问题,如下:
Caused by: java.lang.NullPointerException at org.apache.spark.sql.execution.DataSourceScanExec.$init$(DataSourceScanExec.scala:57) at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:172) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:635) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:162) ... at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.proxyExpressions(SubExprEvaluationRuntime.scala:89) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.<init>(predicates.scala:53) at org.apache.spark.sql.catalyst.expressions.Predicate$.createInterpretedObject(predicates.scala:92) at org.apache.spark.sql.catalyst.expressions.Predicate$.createInterpretedObject(predicates.scala:85) at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:56) at org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:101) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2(basicPhysicalOperators.scala:246) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2$adapted(basicPhysicalOperators.scala:245) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ... at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131)
分析
遇到以上问题,一脸懵?
我们分析一下,这里会涉及到spark的DPP(动态分区裁剪) 以及codegen部分。我们提取最小化的报错sql如下(脱敏处理):
create table test_a_pt(col1 int, col2 int,pt string) USING parquet PARTITIONED BY (pt); insert into table test_a_pt values(1,2,'20220101'),(3,4,'20220101'),(1,2,'20220101'),(3,4,'20220101'),(1,2,'20220101'),(3,4,'20220101'); drop table if exists test_b; create table test_b as select 1 as `搜索demo_uv` ,2 as `搜索demo_gmv`, 'gogo' as scenes, '2021-03-04' as date1; drop table if exists gg_gg; create table gg_gg as SELECT a.pt, a.scenes FROM ( SELECT '20220101' as pt ,'comeon' AS scenes FROM test_b where scenes='gogo' and exists(array(date1),x-> x =='2021-03-04') UNION ALL SELECT pt ,'comeon' FROM ( SELECT pt,COUNT( distinct col2) AS buy_tab_uv FROM test_a_pt where pt='20220101' GROUP BY pt ) ) a JOIN ( SELECT pt ,COUNT(distinct col2) AS buy_tab_uv FROM test_a_pt where pt='20220101' GROUP BY pt ) b ON a.pt = b.pt ; 其中exists方法是继承CodegenFallback的
运行完后,我们可以拿到对应的物理计划,
这个sql会生成对应的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))(这个对下面的分析很重要)表达式,至于为什么会生成对应的表达式,可以看对应的逻辑计划和物理计划的生成日志,这里就不再说明。
我们直接拿堆栈信息进行分析,首先是
Task.run 这说明是在Executor端报错的
再者是FilterExec,对应的代码如下:
protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => val predicate = Predicate.create(condition, child.output) predicate.initialize(0) iter.filter { row => val r = predicate.eval(row) if (r) numOutputRows += 1 r } } }
这里的Predicate.create方法会根据表达式来生成对应的BasePredicate类,这个类是对输入的每一行进行布尔计算的。
按照正常的流程的话,其实是会先调用在createCodeGeneratedObject,如果报错,就会再调用createInterpretedObject方法的。
细心的同学会发现,我们的报错stack中就只有Predicate$.createInterpretedObject,那第一部的代码生成呢?去哪了?
其实我们知道表达式的代码生成是在executor端的,所以我们可以找到对应executor端的代码,可以找到对应的信息:
22/02/26 19:36:19 WARN Predicate: Expr codegen error and falling back to interpreter mode java.lang.NullPointerException at org.apache.spark.sql.execution.DataSourceScanExec.$init$(DataSourceScanExec.scala:57) at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:172) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:635) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:162) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:387) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
WARN Predicate: Expr codegen error and falling back to interpreter mode 这句就说明了该sql生成codegen的代码报错了,至于为什么生成代码失败了,下一次分析。
我们暂且就按照报错的堆栈进行分析,直接转到InterpretedPredicate类:
case class InterpretedPredicate(expression: Expression) extends BasePredicate { private[this] val subExprEliminationEnabled = SQLConf.get.subexpressionEliminationEnabled private[this] lazy val runtime = new SubExprEvaluationRuntime(SQLConf.get.subexpressionEliminationCacheMaxEntries) private[this] val expr = if (subExprEliminationEnabled) { runtime.proxyExpressions(Seq(expression)).head } else { expression }
这里有个subExprEliminationEnabled选项,这个选项是来消除公共表达式的,可以节省计算的时间,默认是开启的,注意这里很关键。
因为该选项开启了,所以我们接着往下走:
proxyExpressions的expressions.foreach(equivalentExpressions.addExprTree(_))方法:
def addExprTree( expr: Expression, addFunc: Expression => Boolean = addExpr): Unit = { val skip = expr.isInstanceOf[LeafExpression] || // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. expr.find(_.isInstanceOf[LambdaVariable]).isDefined || // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, // can cause error like NPE. (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null) if (!skip && !addFunc(expr)) { childrenToRecurse(expr).foreach(addExprTree(_, addFunc)) commonChildrenToRecurse(expr).filter(_.nonEmpty).foreach(addCommonExprs(_, addFunc)) } }
这里有个skip判断,其中有一条是expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null,这个显然是针对executor端的,因为在executor端会生成TaskContext实例。
还记得我们说的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))表达式吗(broadcastValues 会包含FileSourceScanExec类)?在这里就很关键了,
但是从报错的堆栈信息里,明显可以知道skip是false,所以才会ReusedExchangeExec.doCanonicalize等信息,才会报NPE问题。
分析一下这里expr.isInstanceOf[PlanExpression[_]] 很显然这个判断是不能判断DynamicPruningExpression的,因为InSubqueryExec才是PlanExpression子类,才满足这个条件,
所以我们改成expr.isInstanceOf[PlanExpression[_]] -> expr.find(_.isInstanceOf[PlanExpression[_]]).isDefined
这样我们就能解决该问题。
我们接着往下走(中间的调用看堆栈信息即可):
DataSourceScanExec的57行,
trait DataSourceScanExec extends LeafExecNode { def relation: BaseRelation def tableIdentifier: Option[TableIdentifier] protected val nodeNamePrefix: String = "" override val nodeName: String = { s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" } // Metadata that describes more details of this scan. protected def metadata: Map[String, String] protected val maxMetadataValueLength = sqlContext.sessionState.conf.maxMetadataStringLength ...
也就是protected val maxMetadataValueLength = sqlContext.sessionState.conf.maxMetadataStringLength这行会报错。
其中sqlContext在SparkPlan类中 @transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
SparkSession.getActiveSession方法如下:
def getActiveSession: Option[SparkSession] = { if (TaskContext.get != null) { // Return None when running on executors. None } else { Option(activeThreadSession.get) } }
也就是说在executor端getActiveSession返回的是None,从而引发了NPE。
这样整个代码就完全捋顺了。
解决
关闭DPP(动态代码生成)
set spark.sql.optimizer.dynamicPartitionPruning.enabled=false
关闭公共表达式消除
set spark.sql.subexpressionElimination.enabled=false
修改代码,遇到DPP,直接跳过,并反馈给社区
这里有对应的jira
BTW 本文只是针对spark 3.1.2,对于3.2以及以上的版本代码进行了重构,具体的一些社区讨论可以参考SPARK-23731,SPARK-35742,SPARK-35798
其实解决方法1.2.3都是逐级的把影响范围缩小,至于怎么选择,看自己的选择。。。