Flume入门

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,可以是文件、可以是hdfs。安装tar -zxvf apache-flume-1.

flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,可以是文件、可以是hdfs。

安装

tar -zxvf apache-flume-1.6.0-bin.tar.gz

配置环境变量

export FLUME_HOME=/xxx/flume
export PATH=$PATH:$FLUME_HOME/bin
修改conf下的flume-env.sh,在里面配置JAVA_HOME

验证

flume-ng  version


[root@s166 log]# flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f

好了,到这里我们环境就配置好了。

实例1:监听一个指定的网络端口

1.1 配置文件
flume官网中NetCat Source描述:

Property Name Default     Description
channels       –     
type           –     The component type name, needs to be netcat
bind           –  日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听          
port           –  日志需要发送到的端口号,该端口号要有netcat类型的source在监听   

然后在flume/conf目录下创建一个配置文件netcat-logger.conf

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 描述和配置sink组件:k1
a1.sinks.k1.type = logger

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

表示的是监听44444端口

1.2 启动收集

flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

-c conf   指定flume自身的配置文件所在目录
-f conf/netcat-logger.con  指定我们所描述的采集方案
-n a1  指定我们这个agent的名字
1.3 测试

在另一个终端上执行nc localhost 44444(没有nc的yum install nmap-ncat.x86_64,如果没有该包,请更新成阿里yum源:Redhat7.x 修改阿里云yum源

[root@s166 log]# nc localhost 44444
hello
OK
fantj
OK

然后看flume服务端的响应:

 18:48:48 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
 18:48:49 INFO sink.LoggerSink: Event: { headers:{} body: 66 61 6E 74 6A                                  fantj }

实例2. 监听一个指定的目录,每当有新文件出现,就需要把文件采集到HDFS中去

sources.type:  spooldir
sinks.type: hdfs
2.1 配置文件

flume/conf目录下创建一个配置文件spooldir.conf

#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 配置source组件(监听的文件不能重复)

agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/fantj/log/
agent1.sources.source1.fileHeader = false

#配置拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# 配置sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://s166/weblog/flume-collection/%y-%m-%d/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

大概意思是:监听/home/fantj/log/这个文件,并把它上传到hdfs://s166/weblog/flume-collection/%y-%m-%d/这个路径下。

2.2 启用收集

flume-ng agent -c conf -f ../conf/spoordir.conf -n agent1 -Dflume.root.logger=INFO,console

2.3 测试

我在/home/fantj/log目录下创建一个文本文件。

test.txt

this is a spoordir agent test

然后看flume服务端响应:

 19:00:24 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/fantj/log/test.txt to /home/fantj/log/test.txt.COMPLETED
 19:00:24 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
 19:00:24 INFO hdfs.BucketWriter: Creating hdfs://s166/weblog/flume-collection/18-07-27//access_log.1532732424184.tmp

上传完成后,它会给这个文件加个后缀变成test.txt.COMPLETED来表示成功。

我们打开hadoop的管理页:http://192.168.27.166:50070


img_2e4fd78293d32faefeacdab477c223fe.png

img_0b40d2320e4b7911c56f2d5dfbb3f9ac.png

img_cf8826c47dcda62528a38333fd41fc25.png
打开文件查看

实例3:监听一个指定的文件,每当有新更改,就需要把文件采集到HDFS中去

sources.type: exec
sink.type: hdfs 
3.1 配置文件

同样,我们创建exec.conf文件

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/fantj/log/web_log.log
agent1.sources.source1.channels = channel1

#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://s166/weblog/flume/%y-%m-%d/
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

监听/home/fantj/log/web_log.log这个文件,上传到hdfs://s166/weblog/flume/%y-%m-%d/

3.2 启动

flume-ng agent -c conf -f ../conf/exec.conf -n agent1 -Dflume.root.logger=INFO,console

3.3 测试

我在这个文件里新添:

test
test
test

然后看flume服务端的响应:

19:15:54 INFO hdfs.BucketWriter: Creating hdfs://s166/weblog/flume/18-07-27//access_log.1532733353751.tmp
19:16:56 INFO hdfs.BucketWriter: Closing hdfs://s166/weblog/flume/18-07-27//access_log.1532733353751.tmp
19:16:56 INFO hdfs.BucketWriter: Renaming hdfs://s166/weblog/flume/18-07-27/access_log.1532733353751.tmp to hdfs://s166/weblog/flume/18-07-27/access_log.1532733353751

Creating(刚启动日志) ->Closing ->Renaming (修改文件后日志)
同理我把文件下载下来打开:


img_ba66d50669a49ddc589801811671b4e1.png
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
SQL 分布式计算 监控
Flume学习--1、Flume概述、Flume入门、(一)
Flume学习--1、Flume概述、Flume入门、(一)
|
8月前
|
消息中间件 存储 数据采集
bigdata-11-Flume入门与部署
bigdata-11-Flume入门与部署
84 0
|
8月前
|
消息中间件 存储 SQL
Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
【2月更文挑战第18天】Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
2049 0
|
JSON 监控 Unix
Flume学习--1、Flume概述、Flume入门、(二)
Flume学习--1、Flume概述、Flume入门、(二)
|
存储 缓存 分布式计算
入门Flume
你好看官,里面请!今天笔者讲的是入门Flume。不懂或者觉得我写的有问题可以在评论区留言,我看到会及时回复。 注意:本文仅用于学习参考,不可用于商业用途,如需转载请跟我联系。
304 1
|
SQL 消息中间件 缓存
Apache Flume- 安装部署&简单入门|学习笔记
快速学习 Apache Flume- 安装部署&简单入门
Apache Flume- 安装部署&简单入门|学习笔记
|
消息中间件 大数据 Kafka
Flume入门案例之NetCat-Souces
Flume入门案例之NetCat-Souces
167 0
Flume入门案例之NetCat-Souces
|
存储 缓存 监控
Hadoop-Flume基础理论入门(1)
Hadoop-Flume基础理论入门(1)
Hadoop-Flume基础理论入门(1)
|
8月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析