1、Flume进阶
1.1 Flume事务
1.2 Flume Agent内存原理
1、ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
2、SinkProcessor
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingProcessor和FailoverSinkProcessor。
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverProcessor对应的是Sink,Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复功能。
1.3 拓扑结构
1.3.1 简单串联
这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
1.3.2 复制和多路复用
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。
1.3.3 负载均衡和故障转移
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。
1.3.4 聚合
这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。
1.4 案例实现
前提说明:在Flume之间传输数据要用avro,并且Source用的是avro的FlumeAgent是服务端,在开启时要先开启服务端!!!!
1.4.1 复制和多路复用
1、案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
2、需求分析
3、实现步骤
(1)准备工作
在/opt/module/flume/job 目录下创建 group1 文件夹
在/opt/module/datas/目录下创建 flume3 文件夹
(2)创建 flume-file-flume.conf(group1文件夹下)
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 将数据流复制给所有 channel a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell = /bin/bash -c # Describe the sink # sink 端的 avro 是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
3)创建 flume-flume-hdfs.conf(group1文件夹下)
作用:配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source # source 端的 avro 是一个数据接收服务 a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小大概是 128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k1.hdfs.rollCount = 0 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
(4)创建 flume-flume-dir.conf(group1文件夹下)
作用:配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/data/flume3 # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
(5)执行配置文件
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
(6)启动hadoop和Hive
start-dfs.sh start-yarn.sh bin/hive
(7)检查HDFS上数据
(8)检查/opt/module/datas/flume3 目录中数据
1.4.2 负载均衡和故障转移
1、案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能。
2、需求分析