Flume 使用学习小结-阿里云开发者社区

开发者社区> 大数据> 正文

Flume 使用学习小结

简介: #概述 在做埋点数据离线存储到odps中,用到了Flume。一边使用,一边学习了下Flume。其中感受到Flume确实易伸缩、易扩展。其中的组件都可以根据自己的业务特点方便的自定义使用。 Flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制; 可适用场景:日志--->Flume--->实时计算(如MQ+Storm) 、日志--->Flume--->

概述

  在做埋点数据离线存储到odps中,用到了Flume。一边使用,一边学习了下Flume。其中感受到Flume确实易伸缩、易扩展。其中的组件都可以根据自己的业务特点方便的自定义使用。

Flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制;
可适用场景:日志--->Flume--->实时计算(如MQ+Storm) 、日志--->Flume--->离线计算(如ODPS、HDFS、HBase)、日志--->Flume--->ElasticSearch等。

Flume架构

  Flume主要分为 Source、Channel、Sink三个组件,他们包含在一个Agent中,一个Agent相当于一个独立的application。数据从源头经过Agent的这几个组件最后到达目的地。一个Flume 服务可同时运行多个Agent,大致架构可参照下图:

_

对照这个图,作一些说明;

  • Event 一条日志数据在Flume中对应一个Event对象。不过给他添加了header属性,就是一个Map,放 一些额外信息,可以针对每条Event做特殊处理,比如Channel的选择。这些额外的键值对可以在Event从Source到Channel之间的interceptor(拦截器)中set。
  • Source 负责日志流入,比如从文件、网络、MQ等数据源流入数据。
  • Channel 负责数据聚合/暂存,以供Sink消费掉,事务机制主要在这里实现
  • Sink 负责数据转移到存储,比如从Channel拿到日志后直接存储到ODPS、ElasticSearch等。
  • 拦截器 如果配置了拦截器,则Event从Source 进入Channel前,经过拦截器链做过滤或其他处理;如识别不需要的数据等
  • 选择器 Flume默认实现有ReplicatingChannelSelector(复制,Event可同时发往多个Channel)和MultiplexingChannelSelector(复用,可根据header中某个字段值,发往不同的Channel)

下图是个简单的多Channel、Sink情况;Flume还包含一些其他的高级的特性和使用方法,有时间可以继续研究。
_2

Flume实际使用

 现在做的埋点数据导入ODPS的情况是,每天夜里1点左右把前一天的日志文件copy到Flume监控的目录,Flume处理新加入的文件。最终数据存储到ODPS。

遇到的问题:

  • 日志数据中包含空行等不正确格式的记录,导致从Channel中take日志记录后保存到ODPS失败;失败的操作被事务回滚,结果是数据流传在这个地方错误循环下去。
  • 日志数据按实际生成的日期为分区保存在OPDS表的分区中,导入日志数据的日期为实际日期的后一天。

在尝试了几种方法后,最后选择自定义了一个拦截器实现(UaLogFilteringInterceptor),能很好的达到目前的需求,他主要做如下两件事:

  • 过滤掉不需要、不规范的数据,并且把过滤掉的这些数据存储到指定的文件里,每天一个文件(如果有异常记录)。
  • 在每条Event的header中加入qt值(qt值为前一天的日期,格式为yyyyMMdd),每条Event根据该值保持到ODPS的对应表分区中。

这里顺带说下ODPS的Flume插件,他主要根据ODPS的特点自定义实现了一个Sink,在Flume的配置文件中配置使用该Sink,配置好该Sink的各个配置项,主要包含连接ODPS和使用对应表的。

说说Flume的事务

  主要用到事务实现有针对MemoryChannel的MemoryTransaction和FileChannel的FileBackedTransaction;

Event从Source PUT到Channel和从Channel Take到Sink后落地,这两个步骤都包裹在事务中;我这里说下MemoryTransaction大致实现。
MemoryTransaction 主要用到了两个双向阻塞队列(LinkedBlockingDeque)putList和takeList作为缓冲区,同时配合使用MemoryChannel中的LinkedBlockingDeque queue;队列的大小通过Flume的配置初始化好;

  • PUT事务

    1. 批量数据循环PUT到putList中
    2. Commit,把putList队列中数据offer到queue队列中,然后释放信号量,清空(clear)putList队列
    3. Rollback,清空(clear)putList队列
      这里其实没有做太多事。
  • Take事务

    1. 检查takeList队列大小是否够用,从queue队列中poll Event到takeList队列中
    2. Commit,表明被Sink正确消费掉,清空(clear)takeList队列
    3. Rollback,异常出现,则把takeList队列中的Event返还到queue队列顶部

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

其他文章