这是我的嵌套JSON文件。
{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":35,
"c02_level": 1475,
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": 34,
"c02_level": 1370,
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": 40,
"c02_level": 1346,
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp": 40,
"c02_level": 1574,
"geo": {"lat":35.93, "long":-85.46}
}
}
}
如何使用Spark Scala将JSON文件读入Dataframe。JSON文件中没有数组对象,所以我不能使用explode。有
val df = spark.read.option("multiline", true).json("data/test.json")
df
.select(col("dc_id"), explode(array("source.*")) as "level1")
.withColumn("id", col("level1.id"))
.withColumn("ip", col("level1.ip"))
.withColumn("temp", col("level1.temp"))
.withColumn("description", col("level1.description"))
.withColumn("c02_level", col("level1.c02_level"))
.withColumn("lat", col("level1.geo.lat"))
.withColumn("long", col("level1.geo.long"))
.drop("level1")
.show(false)
样本输出:
dc_id | id | ip | temp | description | c02_level | lat | long |
---|---|---|---|---|---|---|---|
dc-101 | 10 | 68.28.91.22 | 35 | Sensor attached to the container ceilings | 1475 | 38.0 | 97.0 |
dc-101 | 8 | 208.109.163.218 | 40 | Sensor attached to the factory ceilings | 1346 | 33.61 | -111.89 |
dc-101 | 13 | 67.185.72.1 | 34 | Sensor ipad attached to carbon cylinders | 1370 | 47.41 | -122.0 |
dc-101 | 5 | 204.116.105.67 | 40 | Sensor embedded in exhaust pipes in the ceilings | 1574 | 35.93 | -85.46 |
您可以尝试编写一些通用UDF来获取所有单独的列,而不是选择每个列。
注意:使用Spark 2.3进行测试
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。