一. Flume安装与配置
- Flume官网: http://flume.apache.org
- JDK版本要求1.7及以上
- 此次下载与安装使用的Flume版本为: apache-flume-1.6.0-bin.tar.gz
<1> 解压命令:tar -zxvf apache-flume-1.6.0-bin.tar.gz
<2> 安装目录:/usr/local/src/apache-flume-1.6.0-bin
<3> 配置环境变量vi ~/.bashrc
# new add FLUME_HOME export FLUME_HOME=/usr/local/src/apache-flume-1.6.0-bin # new add FLUME_HOME into PATH export PATH=$FLUME_HOME/bin:$PATH
<4> 完整的~/.bashrc
# .bashrc # User specific aliases and functions alias rm='rm -i' alias cp='cp -i' alias mv='mv -i' # Source global definitions if [ -f /etc/bashrc ]; then . /etc/bashrc fi iptables -F setenforce 0 hostname master export JAVA_HOME=/usr/local/src/jdk1.7.0_80 export HADOOP_HOME=/usr/local/src/hadoop-2.6.1 # new add FLUME_HOME export FLUME_HOME=/usr/local/src/apache-flume-1.6.0-bin # added by Anaconda3 #export PATH =/root/anaconda3/bin:$PATH export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib export PATH=$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH # new add FLUME_HOME into PATH export PATH=$FLUME_HOME/bin:$PATH
<5> 重新加载环境变量: source ~/.bashrc
配置是否生效,执行命令echo $FLUME_HOME
[root@master ~]# echo $FLUME_HOME /usr/local/src/apache-flume-1.6.0-bin
Flume配置文件存放路径: /usr/local/src/apache-flume-1.6.0-bin/conf
a) 配置source
b) 配置channel
c) 配置sink
d) 把以上三个组件串起来
2.1 NetCat方式
需求: 监听一个ip端口,并将收到的信息输出到console控制台中
<1> 在conf/
mple.conf: A single-node Flume configuration # Name the components on this agent ## agent的名称: a1 ## a1的source名称: r1 ## a1的sink名称: k1 ## a1的channel名称为:c1 ## 复数表示可以配置多个 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # 配置agent a1的source r1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink # 配置agent a1的sink k1 a1.sinks.k1.type = logger # Use a channel which buffers events in memory # 配置agent a1的channel c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel # 一个source可以对应多个channel,一个sink只能对应一个channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
<2> 运行flume-ng
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/netcat_console.conf --name a1 -Dflume.root.logger=INFO,console
flume-ng agent \ --conf $FLUME_HOME/conf \ #指定配置文件存放的文件夹 --conf-file $FLUME_HOME/conf/netcat_console.conf \ #指定配置文件 --name a1 \ #指定agent名称 -Dflume.root.logger=INFO,console
<3> Telnet对应host和端口:
[root@master badou]# telnet localhost 44444 Trying ::1... telnet: connect to address ::1: Connection refused Trying Connected to localhost. Escape character is '^]'. 111 OK 222 OK 333 OK
观察flume logger
2.2 Exec方式
<1> 在conf/
mple.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/src/flume_test.txt # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
<2> 运行flume-ng
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec_console.conf --name a1 -Dflume.root.logger=INFO,console
<3> 向对应文件尾部追加内容:
echo 111 >> /usr/local/src/flume_test.txt
观察flume logger.
2.3 HDFS
**需求: **通过flume将指定的文件,上传到hdfs中,并指定位置与命名规则
<1> 在conf/
mple.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = a1.sources.r1.port = 41414 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://master:9000/flume_data_pool a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.roundSize = 0 a1.sinks.k1.hdfs.roundCount = 600000 a1.sinks.k1.hdfs.roundInterval = 600 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
<2> 运行flume-ng
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro_hdfs.conf --name a1 -Dflume.root.logger=INFO,console
<3> 验证
flume-ng avro-client --conf conf -H master -p 41414 -F /usr/local/src/flume_test.txt -Dflume.root.logger=DEBUG,console
hadoop fs -ls / # 查看文件内容是否一致: hadoop fs -text /flume_data_pool/events-.1524279392273
2.4 模拟使用Flume监听日志变化,并且把增量日志文件写入到hdfs中
<1> 在conf/
mple.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ## a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/src/flume_test/monitor_source/1.log # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/tailout/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=1 a1.sinks.k1.hdfs.rountUnit=minute a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.roundSize = 20 a1.sinks.k1.hdfs.roundCount = 5 a1.sinks.k1.hdfs.roundInterval = 3 a1.sinks.k1.hdfs.bathchSize=10 a1.sinks.k1.hdfs.useLocalTimeStamp=true # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
<2> 运行flume-ng
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec_hdfs.conf --name a1 -Dflume.root.logger=INFO,console
<3> 验证
echo 111 >> /usr/local/src/flume_test/monitor_source/1.log
hadoop fs -text /flume/tailout/18-04-21/1104/events-.1524279852216