Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【2月更文挑战第19天】Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)

Flume的配置文件就是类似与Kettle的ktr或者kjb,从哪里获取数据怎么处理录到哪里都是通过配置文件进行描述的,官方《Flume 1.9.0 User Guide》已经很详细了,各种sources、channels、sinks都有相当详细的配置说明和demo举例,我们这里弄几个常用的案例进行测试说明。

1.配置格式

配置通常需要【定义】和【绑定】两个部分,放在哪里就是个人习惯了,我习惯定义在上,绑定在下:

  1. 定义 Agent 的 Sources,Channels,Sinks 及其具体参数【参数可以从官网查询】。基本格式如下:
    ```xml

    定义agentName的sources、channels、sinks

    .sources =
    .channels =
    .sinks =

定义sources的具体属性

.sources.. =

定义channels的具体属性

.channels.. =

.channels.. =

定义sinks的具体属性

.sinks.. =


2. 绑定 Sources 和 Sinks 的 Channels。需要注意的是一个Sources可以配置多个Channels,但一个 Sink只能配置一个Channel。【sources后的是channels,sinks后的是channel,一定要注意。】基本格式如下:
```xml
# 绑定sources的channels 
<agentName>.sources.<sourceName>.channels = <channelName1> <channelName2> ...

# 绑定sinks的channel
<agentName>.sinks.<sinkName>.channel = <channelName1>

2.文件命名

配置文件的命名并没有很严格的规范,我的命名习惯是【模块名称】-【sources类型】-【channels类型】-【sinks类型】各个部分都可以简写以缩短文件名。如何通过配置文件启动一个agent:

# 单- 短指令
flume-ng agent \
-n $agent_name \
-c conf -f conf/flume-conf.properties.template
# 双-- 完整指令
flume-ng agent \
--name $agent_name \
--conf conf --conf-file /home/flume/test/test-exec-memory-logger.properties \ -Dflume.root.logger=INFO,console

3.常用案例

3.1 案例一

这里主要是使用一下exec类型的source,exec类型的source能实现的数据获取很广泛,因为command能实现的操作太多了。sinks使用最简单的logger,我们看一下官方的配置及Demo:

在这里插入图片描述

  1. 配置

新建配置文件test-exec-memory-logger.properties 其内容如下:

# 1.定义
# 定义agent的sources,channels,sinks
a1.sources = s1 
a1.channels = c1 
a1.sinks = k1 
# 定义sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
# 定义channel类型
a1.channels.c1.type = memory
# 定义sink属性
a1.sinks.k1.type = logger

# 2.绑定
#将sources与channels进行绑定【a1的数据源s1的通道是】
a1.sources.s1.channels = c1 
#将sinks与channels进行绑定 【a1的sinks k1的通道是】
a1.sinks.k1.channel = c1
  1. 启动

我们配置过FLUME_HOME的环境变量,所以可在任何地方执行:

[root@tcloud ~]# flume-ng agent \
-n a1 \
-c conf \
-f /home/flume/test/test-exec-memory-logger.properties \
-Dflume.root.logger=INFO,console
  1. 测试

向文件中追加数据:

[root@tcloud ~]# echo "Hello" >> /tmp/log.txt
[root@tcloud ~]# echo "ARE" >> /tmp/log.txt
[root@tcloud ~]# echo "YOU" >> /tmp/log.txt
[root@tcloud ~]# echo "OK" >> /tmp/log.txt

控制台的显示:

# 这里省去一些日志信息只贴出主要的
2021-08-24 11:58:05,499 INFO sink.LoggerSink: Event: 
{
   
    headers:{
   
   } body: 48 65 6C 6C 6F                                  Hello }
{
   
    headers:{
   
   } body: 41 52 45                                        ARE }
{
   
    headers:{
   
   } body: 59 4F 55                                        YOU }
{
   
    headers:{
   
   } body: 4F 4B                                           OK }

3.2 案例二

这里主要是使用一下spooldir类型的source,sinks使用hdfs,将指定目录下的文件上传到hdfs也是常用的场景,我们看一下官方的配置及Demo:

在这里插入图片描述
在这里插入图片描述

  1. 配置

新建配置文件test-exec-memory-logger.properties 其内容如下:

# 1.定义
# 定义agent的sources,channels,sinks
a1.sources = s1 
a1.channels = c1 
a1.sinks = k1 
# 定义sources属性
a1.sources.s1.type = spooldir 
a1.sources.s1.spoolDir = /tmp/logs
    # /tmp/logs 文件夹必须存在 否则会报错
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
# 定义channel属性
a1.channels.c1.type = memory
# 定义sink属性
a1.sinks.k1.type = hdfs
    # 这个地方要注意 不要写成/flume/events/%y-%m-%d/%H/
    # 否则将会是这样的结果 /flume/events/21-08-24/15//log.txt.1629791801963
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = %{
   
   fileName}
    # 生成的文件类型 默认是Sequencefile 可用DataStream 则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 2.绑定
# 将sources与channels进行绑定 
a1.sources.s1.channels = c1
# 将sinks与channels进行绑定 
a1.sinks.k1.channel = c1
  1. 启动

启动前要先启动hdfs:

[root@tcloud sbin]# start-dfs.sh
[root@tcloud sbin]# hdfs version
Hadoop 3.1.3

这次使用以下双杠指令:

[root@tcloud ~]# flume-ng agent \
--name a1 \
--conf conf \
--conf-file /home/flume/test/test-spooling-memory-hdfs.properties \
-Dflume.root.logger=INFO,console
  1. 测试

我们拷贝案例一生成的文件 /tmp/log.txt 到监听目录下,可以从日志看到文件上传到 HDFS 的路径:

[root@tcloud ~]# cp /tmp/log.txt /tmp/logs/
# 日志信息
2021-08-24 15:56:41,727 INFO avro.ReliableSpoolingFileEventReader: 
Preparing to move file /tmp/logs/log.txt to /tmp/logs/log.txt.COMPLETED
2021-08-24 15:56:41,962 INFO hdfs.HDFSDataStream: 
Serializer = TEXT, UseRawLocalFileSystem = false
2021-08-24 15:56:42,265 INFO hdfs.BucketWriter: 
Creating /flume/events/21-08-24/15/log.txt.1629791801963.tmp

查看上传到 HDFS 上的文件内容与本地是否一致:

[root@tcloud ~]# hdfs dfs -cat /flume/events/21-08-24/15/log.txt.1629791801963
2021-08-24 15:58:24,855 INFO sasl.SaslDataTransferClient: 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Hello
ARE
YOU
OK

3.3 案例三

Avro是一个数据序列化系统,设计用于支持大批量数据交换的应用。这里主要是使用一下avro类型的source和sink,将本服务器收集到的数据发送到另外一台服务器。特别注意 :exclamation::exclamation::exclamation: 大家一定要分清 source 和 sink,两台服务器绑定的都是 【把avro作为source的服务器的ip和端口】 这点儿一定要配置好。

在这里插入图片描述
在这里插入图片描述

  1. 配置日志收集Flume

新建配置 test-exec-memory-avro.properties【某些备注不再赘述】,监听文件内容变化,然后将新的文件内容通过avro sink 发送到 tcloud 服务器的 14545 端口:

# 1.定义
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = tcloud
a1.sinks.k1.port = 14545
a1.sinks.k1.batch-size = 1

# 2.绑定
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  1. 配置日志聚合Flume

新建配置 test-avro-memory-logger.properties【某些备注不再赘述】, 监听 tcloud 服务器的 14545 端口,将获取到内容输出到控制台:

# 1.定义
a2.sources = s2
a2.channels = c2
a2.sinks = k2
# sources
a2.sources.s2.type = avro
a2.sources.s2.bind = tcloud
a2.sources.s2.port = 14545
# channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# sink
a2.sinks.k2.type = logger

# 2.绑定
a2.sources.s2.channels = c2
a2.sinks.k2.channel = c2
  1. 启动

在tcloud服务器上启动日志聚集 Flume【此服务器将avro作为sources】:

[root@tcloud ~]# flume-ng agent \
--name a2 \
--conf conf \
--conf-file /home/flume/test/test-avro-memory-logger.properties \
-Dflume.root.logger=INFO,console

在aliyun服务器上启动日志收集 Flume【此服务器将avro作为sinks】:

[root@aliyun ~]#flume-ng agent \
--name a1 \
--conf conf \
--conf-file /home/flume/test/test-exec-memory-avro.properties \
-Dflume.root.logger=INFO,console

这里建议按以上顺序启动,原因是 Avro Source 会先与端口进行绑定,这样 Avro Sink 连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,sink会一直重试,直至建立好连接。

# sources端 连接成功
2021-08-25 09:43:40,119 INFO node.Application: Starting Sink k2
2021-08-25 09:43:40,120 INFO node.Application: Starting Source s2
2021-08-25 09:43:40,121 INFO source.AvroSource: Starting Avro source s2: {
   
    bindAddress: tcloud, port: 14545 }...
2021-08-25 09:43:40,490 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s2: Successfully registered new MBean.
2021-08-25 09:43:40,490 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s2 started
2021-08-25 09:43:40,500 INFO source.AvroSource: Avro source s2 started.

# sinks端 连接成功
2021-08-25 09:43:44,713 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:212)] Rpc sink k1: Building RpcClient with hostname: tcloud, port: 14545
2021-08-25 09:43:44,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:113)] Attempting to create Avro Rpc client.
2021-08-25 09:43:44,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:594)] Using default maxIOWorkers

4.测试

在aliyun也就是【avro作为sinks端】的服务器上向文件 /tmp/log.txt 中追加内容:

[root@aliyun ~]# echo "Hello" >> /tmp/log.txt
[root@aliyun ~]# echo "Are" >> /tmp/log.txt
[root@aliyun ~]# echo "You" >> /tmp/log.txt
[root@aliyun ~]# echo "Ok" >> /tmp/log.txt

在tcloud也就是【avro作为source是端】的服务器上可以看到已经从 14545 端口监听到内容,并成功输出到控制台:

2021-08-25 09:51:48,176 INFO sink.LoggerSink: 
Event: {
   
    headers:{
   
   } body: 48 65 6C 6C 6F                                  Hello }
2021-08-25 09:51:54,084 INFO sink.LoggerSink: 
Event: {
   
    headers:{
   
   } body: 41 72 65                                        Are }
2021-08-25 09:52:03,118 INFO sink.LoggerSink: 
Event: {
   
    headers:{
   
   } body: 59 6F 75                                        You }
2021-08-25 09:52:07,311 INFO sink.LoggerSink: 
Event: {
   
    headers:{
   
   } body: 4F 6B                                           Ok }

4.总结

Flume的部署和使用都相对简单,官网给出的配置和demo也比较详细,我们在使用的时候结合使用场景,选择需要的Soucres、Channels、Sinks类型就行,希望这些内容有用 :sunny:

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
7月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
7月前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
306 1
|
7月前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
144 1
|
7月前
|
Java
flume的log4j.properties配置说明
flume的log4j.properties配置说明
|
7月前
|
存储 监控 Linux
Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
【2月更文挑战第17天】Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
205 1
Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
|
7月前
|
监控 Apache
【Flume】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
【4月更文挑战第4天】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
|
7月前
|
消息中间件 存储 SQL
Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
【2月更文挑战第18天】Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
1970 0
|
7月前
flume之avro实践
flume之avro实践
147 0
|
7月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
136 0
|
7月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
213 0

相关实验场景

更多