我正在使用官方MongoDB Spark Connector从MongoDB集合中读取Spark中的数据,其代码如下:
val spark = SparkSession.
builder().
appName("MongoDB to SQL").
getOrCreate()
val df = MongoSpark.load(spark, readConfig)
df.count()
readConfig是MongoDB的标准读配置,它工作正常。我遇到的问题是我从MongoDB获得一些日期/时间为String,它无法将其转换为Spark类型TimestampValue:
INFO DAGScheduler: Job 1 failed: count at transfer.scala:159, took 3,138191 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver):
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a TimestampType (value: BsonString{value='2999.12.31 14:09:34'})
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:200)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:39)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
从我在调用df.printSchema()之前看到的.count()有问题的属性被列为
| | | |-- endDate: string (nullable = true)
在MongoDB中,endDate也存储为String。Spark是否在此处执行额外步骤来检测架构?然后它没有投出它......?从https://github.com/mongodb/mongo-spark/blob/master/src/main/scala/com/mongodb/spark/sql/MapFunctions.scala#L181查看源代码,它只进行简单的映射,
使用的版本:Mongo-Scala-Driver 2.4.0,Mongo-Spark-Connector 2.3.0,Spark 2.3.1
你需要将日期作为字符串转换为时间戳(包括时区)使用unix_timestamp并将其转换为TimestampType
可以尝试查看此示例:https: //docs.databricks.com/_static/notebooks/timestamp-conversion.html
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。