spark needsUnsafeRowConversion java.util.NoSuchElementException: None.get

简介: spark needsUnsafeRowConversion java.util.NoSuchElementException: None.get

spark版本 3.0.1

在spark 中存在一个bug,该bug的详细信息如下:

None.get
java.util.NoSuchElementException: None.get
scala.None$.get(Option.scala:529)
scala.None$.get(Option.scala:527)
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:463)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:133)
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3200)
org.apache.spark.sql.Dataset.rdd(Dataset.scala:3198)

根据源码定位FileSourceScanExec,定位到如下位置:

 SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled

SparkSession.getActiveSession.get的内容如下:

/**
   * Returns the active SparkSession for the current thread, returned by the builder.
   *
   * @note Return None, when calling this function on executors
   *
   * @since 2.2.0
   */
  def getActiveSession: Option[SparkSession] = {
    if (TaskContext.get != null) {
      // Return None when running on executors.
      None
    } else {
      Option(activeThreadSession.get)
    }
  }

正如注释所写的一样,当在executors端获取SparkSession的时候,直接返回None。 为什么直接返回none,可以参考spark-pr-21436

当然这个问题,已经有人发现了并且提交了pr-29667,所以拿到commitID(37a660866342f2d64ad2990a5596e67cfdf044c0)直接cherry-pick就ok了,


分析一下原因:

其实该原因就是同一个jvm中,两个不同的线程同步调用,就如unit test所示:

test("SPARK-32813: Table scan should work in different thread") {
    val executor1 = Executors.newSingleThreadExecutor()
    val executor2 = Executors.newSingleThreadExecutor()
    var session: SparkSession = null
    SparkSession.cleanupAnyExistingSession()
    withTempDir { tempDir =>
      try {
        val tablePath = tempDir.toString + "/table"
        val df = ThreadUtils.awaitResult(Future {
          session = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
          session.createDataFrame(
            session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
            StructType(Seq(
              StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false))))
            .write.parquet(tablePath)
          session.read.parquet(tablePath)
        }(ExecutionContext.fromExecutorService(executor1)), 1.minute)
        ThreadUtils.awaitResult(Future {
          assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
        }(ExecutionContext.fromExecutorService(executor2)), 1.minute)
      } finally {
        executor1.shutdown()
        executor2.shutdown()
        session.stop()
      }
    }
  }


相关文章
|
6月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
157 0
|
1月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
23 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
2月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
41 5
|
5月前
|
分布式计算 资源调度 Hadoop
Java大数据处理:Spark与Hadoop整合
Java大数据处理:Spark与Hadoop整合
|
6月前
|
SQL 分布式计算 Java
HiveOnSpark 报错:java.lang.IllegalStateException(Connection to remote Spark driver was lost)‘ Last kno
Hive On Spark 测试时遇到`java.lang.IllegalStateException`和`FileNotFoundException`,问题根源是 Spark 缺少 `hive-exec-3.1.3.jar`。解决方法:从 `$HIVE_HOME/lib/`复制该 jar 到 `$SPARK_HOME/jars/`,并使用 `hdfs dfs -put`命令将其上传至 HDFS 的 `/spark-jars/`(根据实际情况调整路径)。重启 Hive 元数据服务后问题解决。
245 0
HiveOnSpark 报错:java.lang.IllegalStateException(Connection to remote Spark driver was lost)‘ Last kno
|
5月前
|
SQL JSON 分布式计算
|
5月前
|
SQL 分布式计算 Java
|
6月前
|
分布式计算 Java 测试技术
Spark 单元测试报Error:(26, 16) java: 程序包sun.misc不存在
Spark 单元测试报Error:(26, 16) java: 程序包sun.misc不存在
110 0
|
6月前
|
分布式计算 Java Scala
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
|
6月前
|
SQL 分布式计算 Hadoop
[AIGC ~大数据] 深入理解Hadoop、HDFS、Hive和Spark:Java大师的大数据研究之旅
[AIGC ~大数据] 深入理解Hadoop、HDFS、Hive和Spark:Java大师的大数据研究之旅
181 0