Json Structure is -:
aa.json
[[{"foo":"test1"},{"foo1":"test21"}],
[{"foo":"test2"},{"foo1":"test22"}],
[{"foo":"test3"},{"foo1":"test23"}]]
用于读取DataFrame的代码:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
a=sqlContext.read.option('multiline',"true").json('aa.json');
a.show() | |
---|---|
foo | foo1 |
null | null |
a.printSchema()
root
|-- foo: string (nullable = true)
|-- foo1: string (nullable = true)
以下是读取此json的行,它可以解析模式而不是数据。
应用一些正则表达式并转换为rdd可能对您有用。
先使用textFile以下方法读取文件:
a=spark.read.option('multiline',"true").text('aa.json')
a.show(truncate=False)
现在我们可以使用pyspark.sql.functions.regexp_replace从每行中删除额外的方括号和尾随逗号:
from pyspark.sql.functions import regexp_replace
a = a.select(regexp_replace("value", "(^[(?=[))|((?<=])]$)|(,$)", "").alias("value"))
a.show(truncate=False)
这里的模式是逻辑或以下模式:
^[(?=[):字符串开头后跟[[(第二[个是非捕获组)
(?<=])]$:]]在字符串的末尾(第]一个是非捕获组)
,$:字符串末尾的逗号
任何匹配的模式都将替换为空字符串。
现在转换为rdd并使用json.loads将行解析为字典列表。然后将所有这些字典合并到一个字典中并调用pyspark.sql.Row构造函数。最后调用.toDF转换回DataFrame。
How to merge two dictionaries in a single expression?
def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
import json
from pyspark.sql import Row
from functools import reduce
a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。