Spark在创建数据集时无法反序列化记录-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Spark在创建数据集时无法反序列化记录

2018-12-19 17:21:02 3108 1

我正在从S3读取大量的CSV(一切都在一个键前缀下)并创建一个强类型的Dataset。

val events: DataFrame = cdcFs.getStream()
events
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]
其中TradeRecord是一个案例类,通常可以通过SparkSession implicits反序列化。但是,对于某个批处理,记录无法反序列化。这是错误(省略了堆栈跟踪)

Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:

  • field (class: "scala.Long", name: "deal")
  • root class: "com.company.trades.TradeRecord"
    If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

deal作为一个字段TradeRecord永远不应该在源数据(S3对象)中为空,所以它不是一个Option。

不幸的是,错误消息并没有给我任何关于CSV数据的样子,甚至是它来自哪个CSV文件的线索。该批处理包含数百个文件,因此我需要一种方法将其缩小到最多几个文件以调查该问题。

取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:06

    这是我提出的解决方案(我正在使用Spark Structured Streaming):

    val stream = spark.readStream
    .format("csv")
    .schema(schema) // a StructType defined elsewhere
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "corruptRecord")
    .load(path)

    // If debugging, check for any corrupted CSVs
    if (log.isDebugEnabled) { // org.apache.spark.internal.Logging trait
    import spark.implicits._
    stream

    .filter($"corruptRecord".isNotNull)
    .withColumn("input_file", input_file_name)
    .select($"input_file", $"corruptRecord")
    .writeStream
    .format("console")
    .option("truncate", false)
    .start()

    }

    val events = stream
    .withColumn("event", lit("I"))
    .withColumn("source", lit(sourceName))
    .as[TradeRecord]
    基本上,如果将Spark日志级别设置为Debug或更低,则会检查DataFrame是否存在损坏的记录,并将所有此类记录与其文件名一起打印出来。最终程序尝试将此DataFrame强制转换为强类型Dataset[TradeRecord]并失败。


    根据user10465355的建议,您可以加载数据:

    val events: DataFrame = ???
    过滤

    val mismatched = events.where($"deal".isNull)
    添加文件名

    import org.apache.spark.sql.functions.input_file_name

    val tagged = mismatched.withColumn("_file_name", input_file_name)
    (可选)添加块和块以及偏移量:

    import org.apache.spark.sql.functions.{spark_partition_id, monotonically_increasing_id, shiftLeft, shiftRight

    df
    .withColumn("chunk", spark_partition_id())
    .withColumn(

    "offset",
    monotonically_increasing_id - shiftLeft(shiftRight(monotonically_increasing_id, 33), 33))
    
    0 0
相关问答

1

回答

Spark和RDD的关系是怎样的?

2021-12-06 20:40:40 273浏览量 回答数 1

1

回答

spark部分的 RDD是什么?

2021-12-06 19:12:58 240浏览量 回答数 1

1

回答

Spark中RDD(Resilient Distributed Datasets)是什么?

2021-12-05 20:05:17 109浏览量 回答数 1

1

回答

Spark中RDD的属性是什么?

2021-12-05 20:06:54 98浏览量 回答数 1

1

回答

Spark中RDD的特点是什么?

2021-12-05 20:07:38 104浏览量 回答数 1

0

回答

请教各位大神,我想要在如下数据结构的spark rdd中提取出内嵌的字典

2019-09-26 17:37:39 534浏览量 回答数 0

2

回答

spark的RDD内容直接用saveAsTextFile保存到hdfs时会出现中文乱码现象,但在控制台用foreach打印该RDD数据显示是正常的,该怎么解决呢?

2019-01-30 15:01:08 5491浏览量 回答数 2

1

回答

在Spark Streaming Python中将RDD转换为Dataframe

2018-12-21 13:36:36 2187浏览量 回答数 1

1

回答

在一个文件中对spark RDD进行排序和排名

2018-12-12 11:27:36 3000浏览量 回答数 1

1

回答

在spark rdd级别中使用groupby的条件运算符 - scala

2018-12-06 15:33:05 4437浏览量 回答数 1
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
12
文章
824
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载