@[toc]
前言:如果你在编写 Flume 配置文件时,分不清 Source
、Channel
、Sink
三者之间的关系,不懂得 Flume 配置文件的编写流程,无从下手,那么你可以选择看看这篇文章,能对你有所帮助。
Flume 配置的三大部分
Flume 的配置文件主要包含三大部分,分别是:
数据源
Source
:负责从多种数据源收集数据,如日志文件、网络流、系统日志等。数据通道
Channel
:通道负责将收集到的数据暂存起来,进行缓冲,确保数据不会丢失。通道可以是内存通道、文件通道或者 Kafka 等其他队列。数据处理
Sink
:数据处理器,负责将数据发送到目标位置进行存储,比如 HDFS、HBase、Kafka 等。
需要注意的是,并不是所有的 Flume 配置都需要包含这三部分,具体的配置取决于用户的需求和使用情况,这个后面再进行讨论。
Flume 配置文件编写全流程
1.查
没错,第一步就是查官方文档 —— FlumeUserGuide 根据官方文档提供的样例以及参数说明来进行配置。
2. 抄
抄就完了,官方提供了一个样例 A simple example
,如下所示:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这个就是 Flume 经典的案例,配置从本地 44444
端口采集数据,并将数据通过内存通道缓冲,最后将数据记录到日志文件中。几乎所有的 Flume 配置都可以通过这个模板进行更改,只是参数略有不同罢了。
看完上面的经典案例,那么就来实操一个配置文件吧,配置一个将 HTTP 数据采集到 HDFS 上的案例,并通过本地文件进行缓冲(通道)。
本文不对参数的具体作用进行说明,只讲述配置流程,如有需要,可以查阅官网进行学习。
3. 声明
首先需要进行组件名称的声明,也就是像我们写代码定义变量名一样(套模板就完事了)。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
其中 a1.sources = r1
代表定义了一个名称为 r1
的源数据组件,其它两个是同样的意思。我们也可以配置多个组件,例如:a1.sources = r1 r2
,这样就配置了两个源数据组件。
4.Source 源配置
这个地方需要根据我们具体的业务场景来进行配置,假设当前需要采集的是 HTTP
中的数据,那么我们就可以在官网中搜索 HTTP Source
。你也可以在官网页面 —— FlumeUserGuide 中搜索 Flume Sources
,可以在其中找到所有 Flume 提供的 Sources
配置方案及说明。
其中加粗的部分是必填项,其它参数都有默认值,你可以根据右侧的说明进行调整。
Flume 贴心的为每个配置都提供了样例进行参考,如下所示(HTTP Source
的参考配置):
我这里配置如下:
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.bind = localhost
其中 r1
就是我们上面定义的源数据组件名称,其它都为参数的固定写法。
5.Sink 存储配置
同样的,我们可以在官网中搜索 HDFS Sink
或者在官网页面 —— FlumeUserGuide 中搜索 Flume Sinks
,可以在其中找到所有 Flume 提供的 Sink
配置方案及说明。
找到之后,直接来看看 Flume 提供的模板:
在其基础上进行修改:
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
其中 k1
就是我们上面定义的存储组件名称,其它都为参数的固定写法。
6.Channel 通道配置
同样的,我们可以在官网页面 —— FlumeUserGuide 中搜索 Flume Channels
,可以在其中找到所有 Flume 提供的 Channels
配置方案及说明。
一样的,找到之后,直接来看看 Flume 提供的模板:
我们根据模板来进行修改:
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/data
其中 c1
就是我们上面定义的通道组件名称,其它都为参数的固定写法。
7.组装
三大组件部分编写完成后,还需要进行组装,将源(Source)和存储(Sink)与通道(Channel)进行绑定,以构建数据的流动路径。
你可以想象一个蓄水池,源是河流,通道是水管,存储是蓄水池,人们利用水管将水流引入蓄水池中,据此来进行绑定,它们才知道谁是谁。
通过组装,Flume 可以确保数据从源处接收并经过通道传输,最终进行存储(汇总)。
那么现在对我们的案例来进行组装,如下所示:
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
将源和汇都绑定上通道。
测试
现在,我们整个配置文件都已经编写完成了,如下所示:
# 声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Source 源配置
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.bind = localhost
# Sink 处理/存储配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
# Channel 通道配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/data
# 组装/绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
将该配置文件命名为 httpToHDFS.conf
,并将其存储在 $FLUME_HOME/conf
目录中,下面对该文件进行运行测试。
首先,我们在前台启动 Flume,指定刚刚配置的文件:
cd $FLUME_HOME
./bin/flume-ng agent -n a1 -c conf/ -f conf/httpToHDFS.conf -Dflume.root.logger=INFO,console
命令解析
./bin/flume-ng agent
: 这是启动 Flume 代理的命令。-n a1
:-n
参数指定了代理的名称,这里指定的名称是a1
。该名称将在配置文件中使用,用于引用代理中的各个组件。-c conf/
:-c
参数指定了Flume配置文件的路径。在这个例子中,配置文件位于conf/
目录下。-f conf/httpToHDFS.conf
:-f
参数指定了要使用的 Flume 配置文件的文件路径。在这个例子中,使用的配置文件名为httpToHDFS.conf
,位于conf/
目录下。-Dflume.root.logger=INFO,console
:这是设置 Flume 日志级别和日志输出方式的参数。INFO
表示日志级别为 INFO,console
表示日志将输出到控制台,可以根据需要调整日志级别和输出方式。
启动完成后如下所示:
下面我们通过其它窗口使用 curl
命令模拟发送数据到 HTTP 源:
curl -X POST -d'[{"body":"hello body"}]' http://localhost:5140
curl -X POST -d'[{"body":"hello flume"}]' http://localhost:5140
正常情况下,执行完命令之后 Flume 就会监听并采集到该数据了,如下所示:
我们可以通过 Hadoop 命令查看该路径下存储的内容:
hdfs dfs -text /flume/events/2024-04-02/2300/00/ev*
注意,该路径并不是固定的,请修改成你自己的路径。
可以看到,Flume 已经成功将数据采集并存储到 HDFS 上了。
灵魂拷问,现在你会编写 Flume 配置文件了吗? qwq