开发者社区> 问答> 正文

Spark批量加载文件集合,并从文件级别查找每个文件中的行以及其他信息

我有使用逗号分隔符指定的文件集合,如:

hdfs://user/cloudera/date=2018-01-15,hdfs://user/cloudera/date=2018-01-16,hdfs://user/cloudera/date=2018-01-17,hdfs://user/cloudera/date=2018-01-18,hdfs://user/cloudera/date=2018-01-19,hdfs://user/cloudera/date=2018-01-20,hdfs://user/cloudera/date=2018-01-21,hdfs://user/cloudera/date=2018-01-22
我正在使用Apache Spark加载文件,所有这些都与:

val input = sc.textFile(files)
此外,我还有与每个文件相关的其他信息 - 唯一ID,例如:

File ID

hdfs://user/cloudera/date=2018-01-15 | 12345
hdfs://user/cloudera/date=2018-01-16 | 09245
hdfs://user/cloudera/date=2018-01-17 | 345hqw4
and so on
作为输出,我需要接收带有行的DataFrame,其中每行将包含相同的ID,作为从中读取该行的文件的ID。

是否有可能以某种方式将此信息传递给Spark,以便能够与线路相关联?

展开
收起
社区小助手 2018-12-12 18:11:24 2139 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    核心sql方法UDF(join如果你将File - > ID映射表示为Dataframe,你可以实现同样的目的):

    import org.apache.spark.sql.functions

    val inputDf = sparkSession.read.text(".../src/test/resources/test")

    .withColumn("fileName", functions.input_file_name())
    

    def withId(mapping: Map[String, String]) = functions.udf(
    (file: String) => mapping.get(file)
    )

    val mapping = Map(
    "file:///.../src/test/resources/test/test1.txt" -> "id1",
    "file:///.../src/test/resources/test/test2.txt" -> "id2"
    )

    val resutlDf = inputDf.withColumn("id", withId(mapping)(inputDf("fileName")))
    resutlDf.show(false)
    结果:

    value fileName id
    row1 file:///.../src/test/resources/test/test1.txt id1
    row11 file:///.../src/test/resources/test/test1.txt id1
    row2 file:///.../src/test/resources/test/test2.txt id2
    row22 file:///.../src/test/resources/test/test2.txt id2

    text1.txt:

    row1
    row11
    text2.txt:

    row2
    row22

    2019-07-17 23:20:14
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载