扩疆 2016-06-13 3993浏览量
在做埋点数据离线存储到odps中,用到了Flume。一边使用,一边学习了下Flume。其中感受到Flume确实易伸缩、易扩展。其中的组件都可以根据自己的业务特点方便的自定义使用。
Flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制;
可适用场景:日志--->Flume--->实时计算(如MQ+Storm) 、日志--->Flume--->离线计算(如ODPS、HDFS、HBase)、日志--->Flume--->ElasticSearch等。
Flume主要分为 Source、Channel、Sink三个组件,他们包含在一个Agent中,一个Agent相当于一个独立的application。数据从源头经过Agent的这几个组件最后到达目的地。一个Flume 服务可同时运行多个Agent,大致架构可参照下图:
对照这个图,作一些说明;
下图是个简单的多Channel、Sink情况;Flume还包含一些其他的高级的特性和使用方法,有时间可以继续研究。
现在做的埋点数据导入ODPS的情况是,每天夜里1点左右把前一天的日志文件copy到Flume监控的目录,Flume处理新加入的文件。最终数据存储到ODPS。
遇到的问题:
在尝试了几种方法后,最后选择自定义了一个拦截器实现(UaLogFilteringInterceptor),能很好的达到目前的需求,他主要做如下两件事:
这里顺带说下ODPS的Flume插件,他主要根据ODPS的特点自定义实现了一个Sink,在Flume的配置文件中配置使用该Sink,配置好该Sink的各个配置项,主要包含连接ODPS和使用对应表的。
主要用到事务实现有针对MemoryChannel的MemoryTransaction和FileChannel的FileBackedTransaction;
Event从Source PUT到Channel和从Channel Take到Sink后落地,这两个步骤都包裹在事务中;我这里说下MemoryTransaction大致实现。
MemoryTransaction 主要用到了两个双向阻塞队列(LinkedBlockingDeque)putList和takeList作为缓冲区,同时配合使用MemoryChannel中的LinkedBlockingDeque queue;队列的大小通过Flume的配置初始化好;
PUT事务
Take事务
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
大数据计算实践乐园,近距离学习前沿技术