Flume介绍
1.Source 用于采集数据,Source 是产生数据流的地方,同时 Source 会将产生的数据流传输到 Channel, 这个有点类似于 Java IO 部分的 Channel。
2.Channel 用于桥接 Sources 和 Sinks,类似于一个队列。
3.Sink 从 Channel 收集数据,将数据写到目标源(可以是下一个 Source,也可以是 HDFS 或者 HBase)。
4.Event 传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。
Flume安装
环境:
hadoop-2.7.2
apache-flume-1.7.0
资料下载:
学习视频下载
链接:https://pan.baidu.com/s/1TdHMihOPb0hHt6L5OmyTnA
提取码:o5h2
复制这段内容后打开百度网盘手机App,操作更方便哦
apache-flume-1.7.0-bin.tar.gz
链接:https://pan.baidu.com/s/1f9wcALqEHpFNIrTkrPd0XA
提取码:vujs
复制这段内容后打开百度网盘手机App,操作更方便哦
flume-hadoop-jar
链接:https://pan.baidu.com/s/1n0YVyRj4uqdVRhG5NyxmZA
提取码:axyc
复制这段内容后打开百度网盘手机App,操作更方便哦
注意事项:
安装包放在 /opt/software
安装在 /opt/module
看到端口8020和9000的配置时候仔细看一下文字,看到端口8020和9000的配置时候仔细看一下文字,看到端口8020和9000的配置时候仔细看一下文字(有坑)
安装过程:
将apache-flume-1.7.0 上传到 /opt/software下
将apache-flume-1.7.0 解压到 /opt/module 下 ,将解压后的文件夹重命名为flume
将flume/conf/flume-env.sh.template 重命名为 flume-env.sh
修改flume-env.sh之前
修改flume-env.sh之后
案例一:flume监控本地hello.txt(某某.log)文件并且上传到HDFS
拷贝Hadoop相关jar到 Flume的 lib目录下
jar列表
找别的版本的jar包解决方法
进入到hadoop目录下
用find 模糊查询名字,一个一个查询
编写flume-localfile-hdfs.conf
监控的文件的路径:
a2.sources.r2.command = tail -F /opt/module/flume/temp/hello.txt
hdfs路径(有坑)(有坑)(有坑):
有的视频教程的端口是8020,即( hdfs://你的ip地址:8020/flume/%Y%m%d/%H),如果你按照这个端口配置出现服务正常开启,但是数据写入不到HDFS中,那就是是这个端口的问题,端口的值应该与下面当初配置hdaoop的值一样,如下图中的值,
a2.sinks.k2.hdfs.path = hdfs://你的ip地址:9000/flume/%Y%m%d/%H
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/flume/temp/hello.txt a2.sources.r2.shell = /bin/bash -c # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://你的ip地址:9000/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 600 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k2.hdfs.rollCount = 0 #最小副本数 a2.sinks.k2.hdfs.minBlockReplicas = 1 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
执行监控任务
其中: job/flume-localfile-hdfs.conf是上面文件的路径
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-localfile-hdfs.conf
结果展示(文字介绍)
HDFS会有 /flume/%Y%m%d/%H 目录 , 在本地的hello.txt里面添加数据 HDFS目录下的文件也会添加相应的数据
案例二:实时读取整个目录下的文件到 HDFS
创建要监控的文件夹
在 flume 文件下创建 upload 文件夹,如下图所示,其实在哪创建名字为某某的文件夹都无所谓
编写 flume-folder-hdfs.conf
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/module/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp 结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://47.105.132.96:9000/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount = 0 #最小冗余数 a3.sinks.k3.hdfs.minBlockReplicas = 1 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
执行监控任务
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-folder-hdfs.conf
结果展示(文字介绍)
HDFS上会创建相应的目录,在upload文件夹中创建的文件存在HDFS中合并成一个文件,并且本地文件上传成功的文件用.COMPLETED结尾
案例三:Flume 与 Flume 之间数据传递:单 Flume 多 Channel、 Sink
监控 /opt/module/flume/temp/hello.txt 并且上传到HDFS和本地的/opt/module/flume/temp2下
fiume1.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 将数据流复制给多个 channel a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/flume/temp/hello.txt a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = iZm5ea99qngm2v98asii1aZ a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = iZm5ea99qngm2v98asii1aZ a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
flume2.conf
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = iZm5ea99qngm2v98asii1aZ a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://iZm5ea99qngm2v98asii1aZ:9000/flume2/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是 128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k1.hdfs.rollCount = 0 #最小冗余数 a2.sinks.k1.hdfs.minBlockReplicas = 1 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
fiume3.conf
注意: a3.sinks.k1.sink.directory = /opt/module/flume/temp2 的目录自己创建,否则不会成功
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = iZm5ea99qngm2v98asii1aZ a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/flume/temp2 # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
执行监控任务:先启动flume1,在启动23
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/groupjob/flume1.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/groupjob/flume2.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/groupjob/flume3.conf
结果展示(文字介绍)
HDFS中写入数据,本地指定的文件夹中也有数据
遇到的坑
问题1:安装flume后数据写入不到HDFS,启动不报错
解决办法:conf中的端口应该和配置hadoop中core-site.xml(图二)中端口一致
参考文献: 解决flume采集文件写不到hdfs里面问题_晓风岚月-CSDN博客_flume不能写入hdfs