使用sparkStreaming消费数据,并使用Dstream的 saveAsTextFile保存数据到hdfs中,通过使用这个方法,生成的文件夹存在问题,
代码例子如下:
resultRdd.map(x=>x).saveAsTextFiles("hdfs:ip//data/storage/20181010/"+(new Date())) //new Date()自行转化
ssc.start()
ssc.awaitermination()
而hsfs中目录显示为
/data/storage/20181010/201810100708223-1547016648000
/data/storage/20181010/201810100708223-1547016652000
/data/storage/20181010/201810100708223-1547016658000
.........................................
从中发现最后面多了一条横杠 -和时间戳1547016648000,是根据间隔时间自动生成的,但是我不想要他后面的-1547016648000,
并且201810100708223日期固定住了
查看saveAsTextFiles源码
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix,time)
rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc)
}
saveAsTextFiles方法中也是调用了saveAsTextFile方法,其中有个添加时间戳的方法。
于是我根据源码自己使用foreachRDD,生成文件使用saveAsTextFile
resultRdd.foreachRDD{
rdd=>{}
rdd.map(x=>x).saveAsTextFile("hdfs:ip//data/storage/20181010/"+(new Date())) //new Date()自行转化
}
ssc.start()
ssc.awaitermination()
现在hsfs中目录显示为
/data/storage/20181010/201810100708223
/data/storage/20181010/201810100708460
达到自己想要的结果,根据streaming 间隔时间生成文件夹,并其中包含文件。