【Flume】(六)Flume 开发实战案例分享2

简介: 【Flume】(六)Flume 开发实战案例分享2


三、实时读取目录文件到 HDFS 案例


3.1 案例需求:


使用 Flume 监听整个目录的文件。


3.2 需求分析:



3.3 实现步骤:


1.创建配置文件 flume-dir-hdfs.conf


创建一个文件


[root@hadoop102 job]$ touch flume-dir-hdfs.conf

打开文件


[root@hadoop102 job]$ vim flume-dir-hdfs.conf

添加如下内容


a3.sources = r3 
a3.sinks = k3 
a3.channels = c3
# Describe/configure the source 
a3.sources.r3.type = spooldir 
a3.sources.r3.spoolDir = /opt/module/flume/upload 
a3.sources.r3.fileSuffix = .COMPLETED 
a3.sources.r3.fileHeader = true 
#忽略所有以.tmp 结尾的文件,不上传 
a3.sources.r3.ignorePattern = ([^ ]*\.tmp) 
# Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#积攒多少个 Event 才 flush 到 HDFS 一次 
a3.sinks.k3.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 600 
#设置每个文件的滚动大小大概是 128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#文件的滚动与 Event 数量无关 
a3.sinks.k3.hdfs.rollCount = 0 
#最小冗余数 
a3.sinks.k3.hdfs.minBlockReplicas = 1 
# Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 
# Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3

2.启动监控文件夹命令

[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

说明: 在使用 Spooling Directory Source 时


不要在监控目录中创建并持续修改文件

上传完成的文件会以.COMPLETED 结尾

被监控文件夹每 500 毫秒扫描一次文件变动


3.向 upload 文件夹中添加文件


在/opt/module/flume 目录下创建 upload 文件夹

[root@hadoop102 flume]$ mkdir upload

向 upload 文件夹中添加文件

[root@hadoop102 upload]$ touch demo.txt 
[root@hadoop102 upload]$ touch demo.tmp 
[root@hadoop102 upload]$ touch demo.log

4.查看 HDFS 上的数据



5.等待 1s,再次查询 upload 文件夹


[root@hadoop102 upload]$ ll 
总用量 0 -rw-rw-r--. 1 demodemo 0 5 月 20 22:31 demo.log.COMPLETED 
-rw-rw-r--. 1 demodemo0 5 月 20 22:31 demo.tmp 
-rw-rw-r--. 1 demodemo0 5 月 20 22:31 demo.txt.COMPLETED

四、单数据源多出口案例(选择器)


单 Source 多 Channel、Sink 如图所示。



4.1 案例需求:


使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。


4.2 需求分析:



4.3 实现步骤:


0.准备工作


在/opt/module/flume/job 目录下创建 group1 文件夹


[root@hadoop102 job]$ cd group1/

在/opt/module/datas/目录下创建 flume3 文件夹


[root@hadoop102 datas]$ mkdir flume3

1.创建 flume-file-flume.conf


配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。


创建配置文件并打开


[root@hadoop102 group1]$ touch flume-file-flume.conf 
[root@hadoop102 group1]$ vim flume-file-flume.conf

添加如下内容


# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 k2 
a1.channels = c1 c2 
# 将数据流复制给所有 channel 
a1.sources.r1.selector.type = replicating 
# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log 
a1.sources.r1.shell = /bin/bash -c 
# Describe the sink
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141 
a1.sinks.k2.type = avro 
a1.sinks.k2.hostname = hadoop102 
a1.sinks.k2.port = 4142 
# Describe the channel 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
a1.channels.c2.type = memory 
a1.channels.c2.capacity = 1000 
a1.channels.c2.transactionCapacity = 100 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 c2 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2


注:Avro 是由 Hadoop 创始人 Doug Cutting 创建的一种语言无关的数据序列化和 RPC 框 架。


注:RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程 序上请求服务,而不需要了解底层网络技术的协议。


2.创建 flume-flume-hdfs.conf


配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。


创建配置文件并打开


[root@hadoop102 group1]$ touch flume-flume-hdfs.conf 
[root@hadoop102 group1]$ vim flume-flume-hdfs.conf

添加如下内容


# Name the components on this agent 
a2.sources = r1 
a2.sinks = k1 
a2.channels = c1 
# Describe/configure the source 
a2.sources.r1.type = avro 
a2.sources.r1.bind = hadoop102 
a2.sources.r1.port = 4141
# Describe the sink 
a2.sinks.k1.type = hdfs 
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H 
#上传文件的前缀 
a2.sinks.k1.hdfs.filePrefix = flume2- 
#是否按照时间滚动文件夹 
a2.sinks.k1.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位 
a2.sinks.k1.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a2.sinks.k1.hdfs.useLocalTimeStamp = true 
#积攒多少个 Event 才 flush 到 HDFS 一次 
a2.sinks.k1.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a2.sinks.k1.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a2.sinks.k1.hdfs.rollInterval = 600 
#设置每个文件的滚动大小大概是 128M 
a2.sinks.k1.hdfs.rollSize = 134217700 
#文件的滚动与 Event 数量无关 
a2.sinks.k1.hdfs.rollCount = 0 
#最小冗余数 
a2.sinks.k1.hdfs.minBlockReplicas = 1 
# Describe the channel 
a2.channels.c1.type = memory 
a2.channels.c1.capacity = 1000 
a2.channels.c1.transactionCapacity = 100 
# Bind the source and sink to the channel 
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1


3.创建 flume-flume-dir.conf


配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。


创建配置文件并打开


[root@hadoop102 group1]$ touch flume-flume-dir.conf 
[root@hadoop102 group1]$ vim flume-flume-dir.conf

添加如下内容


# Name the components on this agent 
a3.sources = r1 
a3.sinks = k1 
a3.channels = c2 
# Describe/configure the source 
a3.sources.r1.type = avro 
a3.sources.r1.bind = hadoop102 
a3.sources.r1.port = 4142 
# Describe the sink 
a3.sinks.k1.type = file_roll 
a3.sinks.k1.sink.directory = /opt/module/datas/flume3 
# Describe the channel 
a3.channels.c2.type = memory 
a3.channels.c2.capacity = 1000 
a3.channels.c2.transactionCapacity = 100 
# Bind the source and sink to the channel 
a3.sources.r1.channels = c2 
a3.sinks.k1.channel = c2


提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。


4.执行配置文件


分别开启对应配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。


[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf 
[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf 
[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

5.启动 Hadoop 和 Hive

[root@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh 
[root@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh 
[root@hadoop102 hive]$ bin/hive hive (default)>

6.检查 HDFS 上数据



7.检查/opt/module/datas/flume3 目录中数据

[root@hadoop102 flume3]$ ll 总用量 8 -rw-rw-r--. 1 demo demo 5942 5 月 22 00:09 1526918887550-3


目录
相关文章
|
29天前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
34 1
|
1月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
2月前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
39 1
|
2月前
|
存储 SQL Shell
bigdata-13-Flume实战
bigdata-13-Flume实战
26 0
|
2月前
|
XML 数据格式
Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
【2月更文挑战第19天】Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
35 1
|
5月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
49 0
|
5月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
82 0
|
5月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
67 0
|
5月前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
58 0
|
5月前
|
存储 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
60 0

相关实验场景

更多