增加 batch 大小:
这种方法很容易理解,batch 越大,从外部接收的 event 就越多,内存积累的数据也就越多,那么输出的文件数也就会变少,比如上边的时间从10s增加为100s,那么一个小时的文件数量就会减少到1152个。但别高兴太早,实时业务能等那么久吗,本来人家10s看到结果更新一次,现在要等快两分钟,是人都会骂娘。所以这种方法适用的场景是消息实时到达,但不想挤压在一起处理,因为挤压在一起处理的话,批处理任务在干等,这时就可以采用这种方法。
Coalesce:
小文件的基数是 batch_number * partition_number,而第一种方法是减少 batch_number,那么这种方法就是减少 partition_number 了,这个 api 不细说,就是减少初始的分区个数。看过 spark 源码的童鞋都知道,对于窄依赖,一个子 RDD 的 partition 规则继承父 RDD,对于宽依赖(就是那些个xxxByKey操作),如果没有特殊指定分区个数,也继承自父 rdd。那么初始的 SourceDstream 是几个 partiion,最终的输出就是几个 partition。所以 Coalesce 大法的好处就是,可以在最终要输出的时候,来减少一把 partition 个数。但是这个方法的缺点也很明显,本来是32个线程在写256M数据,现在可能变成了4个线程在写256M数据,而没有写完成这256M数据,这个 batch 是不算结束的。那么一个 batch 的处理时延必定增长,batch 挤压会逐渐增大。
Spark Streaming 外部来处理:
我们既然把数据输出到 hdfs,那么说明肯定是要用 Hive 或者 Spark Sql 这样的“sql on hadoop”系统类进一步进行数据分析,而这些表一般都是按照半小时或者一小时、一天,这样来分区的(注意不要和 Spark Streaming 的分区混淆,这里的分区,是用来做分区裁剪优化的),那么我们可以考虑在 Spark Streaming 外再启动定时的批处理任务来合并 Spark Streaming 产生的小文件。这种方法不是很直接,但是却比较有用,“性价比”较高,唯一要注意的是,批处理的合并任务在时间切割上要把握好,搞不好就可能会去合并一个还在写入的 Spark Streaming 小文件。
自己调用 foreach 去 append:
Spark Streaming 提供的 foreach 这个 outout 类 api (一种 Action 操作),可以让我们自定义输出计算结果的方法。那么我们其实也可以利用这个特性,那就是每个 batch 在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性,首先,HDFS 上的文件不支持修改,但是很多都支持追加,那么每个 batch 的每个 partition 就对应一个输出文件,每次都去追加这个 partition 对应的输出文件,这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。所以大概就是一直32个文件。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。