3、实现步骤
(1)准备工作
在/opt/module/flume/job 目录下创建 group2 文件夹
(2)创建 flume-netcat-flume.conf(group2 文件夹下)
配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2
# Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinkgroups = g1 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
(3)创建 flume-flume-console1.conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = logger # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
(4)创建 flume-flume-console2.conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
(5)执行配置文件
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
(6)使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
(7)查看Flume2及Flume3的控制台打印日志
(8)将Flume2 kill ,利用nc发送数据,观察 Flume3 的控制台打印情况
1.4.3 聚合
1、案例需求
hadoop102 上的 Flume-1 监控文件/opt/module/group.log,hadoop103 上的 Flume-2 监控某一个端口的数据流,Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。
2、需求分析
3、案例实现
(1)准备工作
分发 Flume
xsync flume ##xsync是要自己写的分发脚本
在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹。
(2)在 hadoop102 上编辑配置文件
vim flume1-logger-flume.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/group.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(3)在 hadoop103 上编辑配置文件
配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = hadoop103 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop104 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
(4)在 hadoop104 上编辑配置文件
配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 4141 # Describe the sink # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
(5)执行配置文件
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-logger-flume.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-netcat-flume.conf
(6)在 hadoop102 上向/opt/module 目录下的 group.log 追加内容
(7)在 hadoop103 上向 44444 端口发送数据
(8)检查 hadoop104 上数据