Flume安装及配置

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Flume 提供了大量内置的 Source、Channel 和 Sink 类型。而且不同类型的 Source、Channel 和 Sink 可以自由组合—–组合方式基于配置文件的设置,非常灵活。比如:Channel 可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink 可以把日志写入 HDFS、HBase,甚至是另外一个 Source 等。

Flume 提供了大量内置的 Source、Channel 和 Sink 类型。而且不同类型的 Source、Channel 和 Sink 可以自由组合—–组合方式基于配置文件的设置,非常灵活。比如:Channel 可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink 可以把日志写入 HDFS、HBase,甚至是另外一个 Source 等。

安装


  • 下载源码包到 /usr/local/src 目录下
  • 解压tar.gz包
  • 修改配置文件,在 /usr/local/src/apache-flume-1.7.0-bin/conf 目录下
  • 配置环境变量

核心配置


  • /usr/local/src/apache-flume-1.7.0-bin/conf 目录下提供默认配置模板
  • 配置示例——NetCat Source:监听一个指定的网络端口,只要应用程序向这个端口里面写数据,这个 Source 组件就可以获取到数据。 其中 Sink:logger, Channel:memory 。
  • 配置如下:
# Name the components on this agent
agent.sources = netcat
agent.channels = memoryChannel
agent.sinks = loggerSink
# configure the source
agent.sources.netcat.type = netcat
agent.sources.netcat.bind = 192.168.111.238
agent.sources.netcat.port = 8888
# Describe the sink
agent.sinks.loggerSink.type = logger
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# It specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
# Bind the source and sink to the channel
agent.sources.netcat.channels = memoryChannel
agent.sinks.loggerSink.channel = memoryChannel

运行测试


  • 启动名字为 agent 的 Flume Agent 服务端进程
./bin/flume-ng agent -n agent -c ./conf -f ./conf/flume-netcat.properties -Dflume.root.logger=DEBUG,console

参数说明:

-n 指定 agent 名称(与配置文件中代理的名字相同)

-c 指定 Flume 中配置文件的目录

-f 指定配置文件

-Dflume.root.logger=DEBUG,console 设置日志等级

  • 使用 telnet 发送数据
zhwye@ubuntu238:~$ telnet 192.168.111.238 8888
Trying 192.168.111.238...
Connected to 192.168.111.238.
Escape character is '^]'.
big data world
OK
  • 服务端进程收集到的日志数据如下
...
2018-04-12 07:07:22,965 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:315)] Starting connection handler
2018-04-12 07:07:31,457 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:327)] Chars read = 16
2018-04-12 07:07:31,468 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:331)] Events processed = 1
2018-04-12 07:07:35,191 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 62 69 67 20 64 61 74 61 20 77 6F 72 6C 64 0D    big data world. }

常用配置模式


NetCat Source——HDFS Sink 配置


  • 监听一个指定的网络端口,只要应用程序向这个端口里面写数据,这个 Source 组件就可以获取到数据。 其中 Sink:HDFS, Channel:file
  • 配置文件
1. # Name the components on this agent
2. a1.sources = r1
3. a1.sinks = k1
4. a1.channels = c1
5. 
6. # Describe/configure the source
7. a1.sources.r1.type = netcat
8. a1.sources.r1.bind = 192.168.111.238
9. a1.sources.r1.port = 8888
10. 
11. # Describe the sink
12. a1.sinks.k1.type = hdfs
13. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput
14. a1.sinks.k1.hdfs.writeFormat = Text
15. a1.sinks.k1.hdfs.fileType = DataStream
16. a1.sinks.k1.hdfs.rollInterval = 10
17. a1.sinks.k1.hdfs.rollSize = 0
18. a1.sinks.k1.hdfs.rollCount = 0
19. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
20. a1.sinks.k1.hdfs.useLocalTimeStamp = true
21. 
22. # Use a channel which buffers events in file
23. a1.channels.c1.type = file
24. a1.channels.c1.checkpointDir = /usr/flume/checkpoint
25. a1.channels.c1.dataDirs = /usr/flume/data
26. 
27. # Bind the source and sink to the channel
28. a1.sources.r1.channels = c1
29. a1.sinks.k1.channel = c1
• 启动 Flume agent a1 服务端
30. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-hdfs.conf   -Dflume.root.logger=DEBUG,console
• 使用 telnet 发送数据
31. zhwye@ubuntu238:~$ telnet 192.168.111.238 8888
32. Trying 192.168.111.238...
33. Connected to 192.168.111.238.
34. Escape character is '^]'.
35. big data world
36. OK

Spooling Directory Source——HDFS Sink 配置


  • 监听一个指定的目录,只要应用程序向这个指定的目录中添加新的文件,Source 组件就可以获取到该o数据,并解析该文件的内容,然后写入到 Channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:HDFS, Channel:file
1. Property Name       Default      Description
2. channels              –  
3. type                  –          The component type name, needs to be spooldir.
4. spoolDir              –          Spooling Directory Source监听的目录
5. fileSuffix         .COMPLETED    文件内容写入到channel之后,标记该文件
6. deletePolicy       never         文件内容写入到channel之后的删除策略: never or immediate
7. fileHeader         false         Whether to add a header storing the absolute path filename.
8. ignorePattern      ^$           Regular expression specifying which files to ignore (skip)
9. interceptors          –          指定传输中event的head(头信息),常用timestamp
• 配置文件
10. # Name the components on this agent
11. a1.sources = r1
12. a1.sinks = k1
13. a1.channels = c1
14. 
15. # Describe/configure the source
16. a1.sources.r1.type = spooldir
17. a1.sources.r1.spoolDir = /usr/local/datainput
18. a1.sources.r1.fileHeader = true
19. a1.sources.r1.interceptors = i1
20. a1.sources.r1.interceptors.i1.type = timestamp
21. 
22. # Describe the sink
23. # Describe the sink
24. a1.sinks.k1.type = hdfs
25. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput
26. a1.sinks.k1.hdfs.writeFormat = Text
27. a1.sinks.k1.hdfs.fileType = DataStream
28. a1.sinks.k1.hdfs.rollInterval = 10
29. a1.sinks.k1.hdfs.rollSize = 0
30. a1.sinks.k1.hdfs.rollCount = 0
31. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
32. a1.sinks.k1.hdfs.useLocalTimeStamp = true
33. 
34. # Use a channel which buffers events in file
35. a1.channels.c1.type = file
36. a1.channels.c1.checkpointDir = /usr/flume/checkpoint
37. a1.channels.c1.dataDirs = /usr/flume/data
38. 
39. # Bind the source and sink to the channel
40. a1.sources.r1.channels = c1
41. a1.sinks.k1.channel = c1
• 启动 Flume agent a1 服务端
42. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-spool-hdfs.conf   -Dflume.root.logger=DEBUG,console
• 使用 cp 命令向 Spooling Directory 中发送数据
43. cp datafile  /usr/local/datainput   (注:datafile中的内容为:big data world!)
• Spooling Directory Source的两个注意事项
44. 1.拷贝到spool目录下的文件不可以再打开编辑
45. 2.不能将具有相同文件名字的文件拷贝到这个目录下

Exec Source——HDFS Sink 配置


  • 监听一个指定的命令,获取一条命令的结果作为它的数据源 常用的是 tail-F file 指令,只要应用程序向日志(文件)里面写数据,Source 组件就可以获取到日志(文件)中最新的内容 。 其中 Sink:HDFS, Channel:file
  • 配置文件
1. # Name the components on this agent
2. a1.sources = r1
3. a1.sinks = k1
4. a1.channels = c1
5. 
6. # Describe/configure the source
7. a1.sources.r1.type = exec
8. a1.sources.r1.command = tail -F /usr/local/log.file
9. 
10. # Describe the sink
11. a1.sinks.k1.type = hdfs
12. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput
13. a1.sinks.k1.hdfs.writeFormat = Text
14. a1.sinks.k1.hdfs.fileType = DataStream
15. a1.sinks.k1.hdfs.rollInterval = 10
16. a1.sinks.k1.hdfs.rollSize = 0
17. a1.sinks.k1.hdfs.rollCount = 0
18. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
19. a1.sinks.k1.hdfs.useLocalTimeStamp = true
20. 
21. # Use a channel which buffers events in file
22. a1.channels.c1.type = file
23. a1.channels.c1.checkpointDir = /usr/flume/checkpoint
24. a1.channels.c1.dataDirs = /usr/flume/data
25. 
26. # Bind the source and sink to the channel
27. a1.sources.r1.channels = c1
28. a1.sinks.k1.channel = c1
• 启动 Flume agent a1 服务端
29. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-exec-hdfs.conf   -Dflume.root.logger=DEBUG,console
• 使用 echo 命令向 /usr/local/datainput 中发送数据
30. echo  big data > log.file

Avro Source——HDFS Sink 配置


  • 监听一个指定的 Avro 端口,通过 Avro 端口可以获取到 Avro Client 发送过来的文件 。只要应用程序通过 Avro 端口发送文件,Source 组件就可以获取到该文件中的内容。 其中 Sink:HDFS,Channel:file (注:Avro 和 Thrift 都是一些序列化的网络端口–通过这些网络端口可以接受或者发送信息,Avro 可以发送一个给定的文件给 Flume,是 Agent 间的通信协议)
  • 编写配置文件
1. # Name the components on this agent
2. a1.sources = r1
3. a1.sinks = k1
4. a1.channels = c1
5. 
6. # Describe/configure the source
7. a1.sources.r1.type = avro
8. a1.sources.r1.bind = 192.168.111.238
9. a1.sources.r1.port = 4141
10. 
11. # Describe the sink
12. a1.sinks.k1.type = hdfs
13. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput
14. a1.sinks.k1.hdfs.writeFormat = Text
15. a1.sinks.k1.hdfs.fileType = DataStream
16. a1.sinks.k1.hdfs.rollInterval = 10
17. a1.sinks.k1.hdfs.rollSize = 0
18. a1.sinks.k1.hdfs.rollCount = 0
19. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
20. a1.sinks.k1.hdfs.useLocalTimeStamp = true
21. 
22. # Use a channel which buffers events in file
23. a1.channels.c1.type = file
24. a1.channels.c1.checkpointDir = /usr/flume/checkpoint
25. a1.channels.c1.dataDirs = /usr/flume/data
26. 
27. # Bind the source and sink to the channel
28. a1.sources.r1.channels = c1
29. a1.sinks.k1.channel = c1
• 启动 Flume agent a1 服务端
30. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-avro-hdfs.conf   -Dflume.root.logger=DEBUG,console
• 使用avro-client发送文件
31. flume-ng avro-client -c  ../conf  -H 192.168.111.238  -p 4141 -F /usr/local/log.file

总结


总结 Exec Source 和 Spooling Directory Source 是两种常用的日志采集的方式,其中 Exec Source 可以对日志的实时采集,但是当 Flume 不运行或者指令执行出错时,Exec Source 将无法收集到日志数据,日志会出现丢失,从而无法保证收集日志的完整性。而 Spooling Directory Source 在对日志的实时采集上有欠缺。

相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
相关文章
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
533 1
flume的log4j.properties配置说明
flume的log4j.properties配置说明
218 0
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
824 0
|
数据采集 消息中间件 存储
Flume 快速入门【概述、安装、拦截器】
Apache Flume 是一个开源的数据采集工具,用于从各种数据源(如日志、网络数据、消息队列)收集大规模数据,并将其传输和加载到数据存储系统(如 HDFS、HBase、Hive)。Flume 由数据源(Source)、通道(Channel)、拦截器(Interceptor)和接收器(Sink)组成,支持灵活配置以适应不同的数据流处理需求。安装 Flume 包括解压软件包、配置环境变量和调整日志及内存设置。配置文件定义数据源、通道、拦截器和接收器,拦截器允许预处理数据。Flume 适用于构建数据管道,整合分散数据到中心存储系统,便于分析和报告。
2284 3
|
存储 监控 Linux
Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
【2月更文挑战第17天】Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
347 1
Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
|
XML 数据格式
Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
【2月更文挑战第19天】Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
296 1
|
消息中间件 存储 SQL
Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
【2月更文挑战第18天】Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
3557 0
|
存储 监控 Linux
Ganglia【部署 01】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
Ganglia【部署 01】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
309 0
|
数据采集 消息中间件 缓存
Apache Flume及快速安装
Apache Flume及快速安装
228 0
|
消息中间件 存储 Java
flume的安装和配置
flume的安装和配置

热门文章

最新文章