前言
学数仓的时候发现 flume 落了一点,赶紧补齐。
1、Flume 事务
Source 在往 Channel 发送数据之前会开启一个 Put 事务:
- doPut:将批量数据写入临时缓冲区 putList(当 source 中的数据达到 batchsize 或者 超过特定的时间就会发送数据)
- doCommit:检查 channel 内存队列是否足够合并
- doRollback:如果 channel 内存队列空间不足没救回滚数据
同样 Sink 在从 Channel 主动拉取数据的时候也会开启一个 Take 事务:
- doTake:将数据读取到临时缓冲区 takeList,并将数据发送到 HDFS
- doCommit:如果数据全部发送成功,就会清除临时缓冲区 taskList
- dooRollback:数据发送过程如果出现异常,rollback 将临时缓冲区的数据归还给 channel 内存队列
2、Flume Agent 内部原理
注意:只有 source 和 channel 之间可以存在拦截器,channel 和 sink 之间不可以!
- source 接收数据,把数据封装成 Event
- 传给 channel processor 也就是 channel 处理器
- 把事件传给拦截器(interceptor),在拦截器这里可以对数据进行一些处理(我们在上一节中说过,当我们的路径信息中包含时间的时候,需要从 Event Header 中读取时间信息,如果没有就需要我们指定从本地读取 timestamp,所以这里我们就可以在拦截器这里给我们的 event 添加头部信息);而且,拦截器可以设置多个
- 经过拦截器处理的事件又返回给了 channel processor ,然后 channel processor 把事件传给 channel 选择器(channel selector 有两种类型:Replicating 和 Multiplexing ,Replicating 会把source 发送来的 events 发往所有 channel,而 multiplexing 可以配置指定发往哪些 channel)
- 经过 channel 选择器处理后的事件仍然返回给 channel processor
- channel processor 会根据 channel 选择器的结果,发送给相应的 channel(也就是这个时候才会真正的开启 put 事务,之前都是对 event 进行简单的处理)
- SinkProcessor 负责协调拉取 channel 中的数据,它有三种类型:DefaultSinkProcessor、LoadBalancingSinkpProcessor(负载均衡,也就是多个 Sink 轮询的方式去读取 channel 中的数据)、FailoverSinkProcessor(故障转移,每个 sink 有自己的优先级,优先级高的去读取 channel 中的事件,只有当它挂掉的时候,才会轮到下一个优先级的 sink 去读)。其中 DefaultSinkProcessor 一个 channel 只能绑定一个 Sink,所以它也就没有 sink 组的概念。
注意:一个 sink 只可以绑定一个 channel ,但是一个 channel 可以绑定多个 sink!
3、Flume 拓扑结构
3.1、简单串联
官网这段话翻译过来就是:为了将数据跨越多个代理或跃点进行传输,前一个代理的接收器(sink)和当前跃点的源(source)需要是avro类型,接收器指向源的主机名(或IP地址)和端口。
这种模式的缺点很好理解,就像串联电路,一个节点坏了会影响整个系统。
3.2、复制和多路复用
从官网翻译过来就是:上述示例显示了一个名为“foo”的代理源将流程分散到三个不同的通道。这种分散可以是复制或多路复用。在复制流程的情况下,每个事件都会发送到这三个通道。对于多路复用的情况,当事件的属性与预配置的值匹配时,事件将被发送到可用通道的子集。例如,如果事件属性名为“txnType”设置为“customer”,则应发送到channel1和channel3,如果为“vendor”,则应发送到channel2,否则发送到channel3。映射可以在代理的配置文件中设置。
这种模式相比上面的串联模式的优点无非就是可以发送过多个目的地。
3.3、负载均衡和故障转移
Flume 支持多个 Sink 逻辑上分到一个 Sink 组,sink 组配合不同的 SinkProcessor ,可以实现负载均衡和错误恢复的功能。
3.4、聚合
这种模式在实际开发中是经常会用到的,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的 flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。
4、Flume 企业开发实例
4.1、复制和多路复用
注意:多路复用必须配合拦截器使用,因为需要在 Event Header 中添加一些信息。
1)案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
2)需求分析
- 监控文件变动我们可以考虑使用 taildir 或者 exec 这两种 source
- flume-1 sink 需要使用 avro sink 才能传输到下一个 flume-2 和 flume-3 的 source
- flume-2 需要上传数据到 HDFS 所以 sink 为 hdfs
- flume-3 需要把数据输出到本地,所以 sink 为 file_roll sink(要保存到本地目录,这个目录就必须提前创建好,它不像 HDFS Sink 会自动帮我们创建)
我们需要实现三个 flume 作业:
- flume-1 把监听到的新日志读取到 flume-2 和 flume-3 的 source
- flume-2 把日志上传到 hdfs
- flume-3 把日志写到本地
Flume(二)【Flume 进阶使用】(2)https://developer.aliyun.com/article/1532353