我有一个或多个csv文件,我必须在pyspark中合并:
档案1:
c1,c2,c3
1,3,4
文件2:
c4,c5,c6
4,5,6
档案3
c1,c2
7,8
我需要合并文件,以便结果将是:
c1,c2,c3,c4,c5,c6
1,2,3,null,null,null
null,null,null,4,5,6
7,8,null,null,null,null
我试过了:
使用load方法加载文件夹中的所有文件:
spark.read.format("csv").option("header","true")
使用合并合并文件。
两者都只使用了其中一个文件架构
c1,c2,c3
1,3,4
4,5,6
7,8
读取所有文件 - f1,f2,f3并合并列名称。然后,对于每个文件,找到补充列并生成带有lit(null)的新列。最后,通过按顺序选择列名来联合所有dfs。这是scala解决方案。
val f1 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("in/f1.csv")
val f2 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("in/f2.csv")
val f3 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("in/f3.csv")
val fall = f1.columns.union(f2.columns).union(f3.columns).distinct
val f1c = fall.diff(f1.columns)
val f1a = f1c.foldLeft(f1)( (acc,r) => acc.withColumn(r,lit(null)) )
val f2c = fall.diff(f2.columns)
val f2a = f2c.foldLeft(f2)( (acc,r) => acc.withColumn(r,lit(null)) )
val f3c = fall.diff(f3.columns)
val f3a = f3c.foldLeft(f3)( (acc,r) => acc.withColumn(r,lit(null)) )
val result = f1a.select(fall.head,fall.tail:_).union(f2a.select(fall.head,fall.tail:_)).union(f3a.select(fall.head,fall.tail:_*))
result.printSchema
result.show(false)
结果:
root
|-- c1: integer (nullable = true)
|-- c2: integer (nullable = true)
|-- c3: integer (nullable = true)
|-- c4: integer (nullable = true)
|-- c5: integer (nullable = true)
|-- c6: integer (nullable = true)
c1 | c2 | c3 | c4 | c5 | c6 |
---|---|---|---|---|---|
1 | 3 | 4 | null | null | null |
null | null | null | 4 | 5 | 6 |
7 | 8 | null | null | null | null |
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。