开发者社区> 问答> 正文

如何将csv数据集与不同的标题(名称和计数)结合起来?

社区小助手 2018-12-21 14:01:38 447

我有一个或多个csv文件,我必须在pyspark中合并:

档案1:

c1,c2,c3
1,3,4
文件2:

c4,c5,c6
4,5,6
档案3

c1,c2
7,8
我需要合并文件,以便结果将是:

c1,c2,c3,c4,c5,c6
1,2,3,null,null,null
null,null,null,4,5,6
7,8,null,null,null,null
我试过了:

使用load方法加载文件夹中的所有文件:

spark.read.format("csv").option("header","true")

使用合并合并文件。

两者都只使用了其中一个文件架构

c1,c2,c3
1,3,4
4,5,6
7,8

分享到
取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:26

    读取所有文件 - f1,f2,f3并合并列名称。然后,对于每个文件,找到补充列并生成带有lit(null)的新列。最后,通过按顺序选择列名来联合所有dfs。这是scala解决方案。

    val f1 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("in/f1.csv")
    val f2 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("in/f2.csv")
    val f3 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("in/f3.csv")

    val fall = f1.columns.union(f2.columns).union(f3.columns).distinct

    val f1c = fall.diff(f1.columns)
    val f1a = f1c.foldLeft(f1)( (acc,r) => acc.withColumn(r,lit(null)) )

    val f2c = fall.diff(f2.columns)
    val f2a = f2c.foldLeft(f2)( (acc,r) => acc.withColumn(r,lit(null)) )

    val f3c = fall.diff(f3.columns)
    val f3a = f3c.foldLeft(f3)( (acc,r) => acc.withColumn(r,lit(null)) )

    val result = f1a.select(fall.head,fall.tail:_).union(f2a.select(fall.head,fall.tail:_)).union(f3a.select(fall.head,fall.tail:_*))
    result.printSchema
    result.show(false)
    结果:

    root
    |-- c1: integer (nullable = true)
    |-- c2: integer (nullable = true)
    |-- c3: integer (nullable = true)
    |-- c4: integer (nullable = true)
    |-- c5: integer (nullable = true)
    |-- c6: integer (nullable = true)

    c1 c2 c3 c4 c5 c6
    1 3 4 null null null
    null null null 4 5 6
    7 8 null null null null
    0 0
+ 订阅

时时分享云计算技术内容,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。

推荐文章