开发者社区> 问答> 正文

如何在pyspark中读取多级json?

Json Structure is -:
aa.json

[[{"foo":"test1"},{"foo1":"test21"}],
[{"foo":"test2"},{"foo1":"test22"}],
[{"foo":"test3"},{"foo1":"test23"}]]
用于读取DataFrame的代码:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

a=sqlContext.read.option('multiline',"true").json('aa.json');

a.show()
foofoo1
nullnull

a.printSchema()
root
|-- foo: string (nullable = true)
|-- foo1: string (nullable = true)
以下是读取此json的行,它可以解析模式而不是数据。

展开
收起
社区小助手 2018-12-19 15:58:25 2730 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    应用一些正则表达式并转换为rdd可能对您有用。

    先使用textFile以下方法读取文件:

    a=spark.read.option('multiline',"true").text('aa.json')
    a.show(truncate=False)

    +-------------------------------------+

    |value |

    +-------------------------------------+

    |[[{"foo":"test1"},{"foo1":"test21"}],|

    |[{"foo":"test2"},{"foo1":"test22"}], |

    |[{"foo":"test3"},{"foo1":"test23"}]] |

    +-------------------------------------+

    现在我们可以使用pyspark.sql.functions.regexp_replace从每行中删除额外的方括号和尾随逗号:

    from pyspark.sql.functions import regexp_replace
    a = a.select(regexp_replace("value", "(^[(?=[))|((?<=])]$)|(,$)", "").alias("value"))
    a.show(truncate=False)

    +-----------------------------------+

    |value |

    +-----------------------------------+

    |[{"foo":"test1"},{"foo1":"test21"}]|

    |[{"foo":"test2"},{"foo1":"test22"}]|

    |[{"foo":"test3"},{"foo1":"test23"}]|

    +-----------------------------------+

    这里的模式是逻辑或以下模式:

    ^[(?=[):字符串开头后跟[[(第二[个是非捕获组)
    (?<=])]$:]]在字符串的末尾(第]一个是非捕获组)
    ,$:字符串末尾的逗号
    任何匹配的模式都将替换为空字符串。

    现在转换为rdd并使用json.loads将行解析为字典列表。然后将所有这些字典合并到一个字典中并调用pyspark.sql.Row构造函数。最后调用.toDF转换回DataFrame。

    From How to merge two dictionaries in a single expression?

    This code works for python 2 and 3

    def merge_two_dicts(x, y):

    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z
    

    import json
    from pyspark.sql import Row
    from functools import reduce

    a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()

    +-----+------+

    | foo| foo1|

    +-----+------+

    |test1|test21|

    |test2|test22|

    |test3|test23|

    +-----+------+

    2019-07-17 23:23:00
    赞同 展开评论 打赏
问答分类:
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
File Format Benchmark - Avro, JSON, ORC, & Parquet 立即下载
Data Wrangling with PySpark for Data Scientists Who Know Pandas 立即下载
LEARNINGS USING SPARK STREAMING & DATAFRAMES FOR WALMART SEARCH 立即下载