bigdata-13-Flume实战

简介: bigdata-13-Flume实战

实战1:采集文件内容上传到HDFS

需求

采集目录中已有的文件内容,存储到HDFS

分析

source是要基于目录的,channel建议使用file,可以保证不丢数据,sink使用hdfs

source使用Spooling Directory Source

配置Agent

从example.conf 复制一个新的配置文件

创建数据目录

文件配置信息如图

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir
# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/studentDir/checkpoint
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/data
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.197.101:9000/flume/studentDir
a1.sinks.k1.hdfs.filePrefix = stu-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

主要配置checkpointDir和dataDir,

因为这两个目录默认会在用户家目录下生成,

建议修改到其他地方

checkpointDir是存放检查点目录

data是存放数据的目录

最后是sink 因为要向hdfs中输出数据,所以可以使用hdfssink

hdfs.path:是必填项,指定hdfs上的存储目录

filePrefix参数:文件前缀,这个属于可选项

writeFormat:建议改为Text,后期Hive和Impala可以操作这份数据

fileType:默认是SequenceFile,还支持DataStream 和 CompressedStream,DataStream 不会对输出 数据进行压缩,CompressedStream 会对输出数据进行压缩,在这里我们先不使用压缩格式的,所以选 择DataStream

hdfs.rollInterval:默认值是30,单位是秒,表示hdfs多长时间切分一个文件,因为这个采集程序是一 直运行的,只要有新数据,就会被采集到hdfs上面,hdfs默认30秒钟切分出来一个文件,如果设置 为0表示不按时间切文件

hdfs.rollSize:默认是1024,单位是字节,最终hdfs上切出来的文件大小都是1024字节,如果设置为0 表示不按大小切文件

hdfs.rollCount:默认设置为10,表示每隔10条数据切出来一个文件,如果设置为0表示不按数据条数 切文件 这三个参数,如果都设置的有值,哪个条件先满足就按照哪个条件都会执行。 在实际工作中一般会根据时间或者文件大小来切分文件,我们之前在工作中是设置的时间和文件大小 相结合,时间设置的是一小时,文件大小设置的128M,这两个哪个满足执行哪个 所以针对hdfssink的配置最终是这样的。

在启动前,我们初始化一份测试数据

接来下启动Agent

flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file ../conf/file-to-hdfs.conf -Dflume.root.logger=INFO,console

Flume Agent启动成功,成功将数据文件写入HDFS

此时发现文件已经生成了,只不过默认情况下现在的文件是 .tmp 结尾的,表示它在被使用,

因为Flume 只要采集到数据就会向里面写,这个后缀默认是由 hdfs.inUseSuffix 参数来控制的。

文件名上还拼接了一个当前时间戳,这个是默认文件名的格式,当达到文件切割时机的时候会给文件改名 字,去掉.tmp 这个文件现在也是可以查看的,里面的内容其实就是class1.dat文件中的内容。

查看文件内容:表明我们此次采集结果是成功的

那Flume怎么知道哪些文件是新文件呢?它会不会重复读取同一个文件的数据呢?

我们进入目录:/data/log/studentDir

我们发现此时这个文件已经被加了一个后缀 .COMPLETED ,表示这个文件已经被读取过了,所以Flume在 读取的时候会忽略后缀为 .COMPLETED 的文件

接着我们再看一下channel中的数据,因为数据是存在本地磁盘文件中的,所以是可以去看一下的,进入 dataDir指定的目录

发现里面有一个 log-1 的文件,这个文件中存储的其实就是读取到的内容,不过在这无法直接查看

现在我们想看一下Flume最终生成的文件是什么样子的,难道要根据配置等待1个小时或者弄一个128M 的文件过来吗, 其实也没必要,我们可以暴力操作一下 停止Agent就可以看到了,当Agent停止的时候就会去掉 .tmp 标志了

那我再重启Agent之后,会不会再给加上.tmp呢,不会了,每次停止之前都会把所有的文件解除占用状 态,下次启动的时候如果有新数据,则会产生新的文件,这其实就模拟了一下自动切文件之后的效果。

实战2:日志数据汇总采集

需求

  • 将bigdata02和bigdata03机器实时产生的日志数据汇总到bigdata04中
  • 通过bigdata04将数据输出到HDFS指定目录

这里注意:HDFS目录要按天生产每天一个目录。

分析

图解:

这里需要用到3个Agent

  • Agent1负责采集机器bigdata02数据
  • Agent2负责采集机器bigdata03数据
  • Agent3负责汇总机器1和2数据到机器3再统一输出到HDFS
  • Agent1和Agent2因为要实时读取文件中新增数据,所以使用基于文件的source,Exec Source。
  • Channel统一使用基于内存的Channel-Memory Channel
  • 由于需要汇总数据,所以sink端加快传输使用Avro Sink
  • 备注:Avro是一种序列化的手段,经过序列化的数据进行传输的时候效率非常高,Avro Sink发送的数据可以直接被Avro Source接受,无缝衔接

实战

以下定义02为A、03为B、04为C

首先在02机器上配置Flume

配置Agent,创建文件 file-to-avro-104.conf

在03机器上配置Flume

与02机器一样的操作

配置Agent,创建文件file-to-avro-104.conf

在04机器上配置文件avro-to-hdfs.conf

这里有个注意的点:

在指定Agent中sink配置的时候注意,我们的需求是需要按天在hdfs中创建目录,并把当天的数据上传到 当天的日期目录中,这也就意味着hdfssink中的path不能写死,需要使用变量,动态获取时间,查看官 方文档可知,在hdfs的目录中需要使用%Y%m%d。

这个时间其实是需要从数据里面抽取,咱们前面 说过数据的基本单位是Event,Event是一个对象,后面我们会详细分析,在这里大家先知道它里面包含 的既有我们采集到的原始的数据,还有一个header属性,这个header属性是一个key-value结构的,我 们现在抽取时间就需要到event的header中抽取,但是默认情况下event的header中是没有日期的,强行 抽取是会报错的,会提示抽取不到,返回空指针异常。

其实官方文档中也说了,可以使用hdfs.useLocalTimeStamp或者时间 拦截器,暂时最简单直接的方式就是使用hdfs.useLocalTimeStamp,这个属 性的值默认为false,需要改为true

三台机器中的Flume Agent都配置好了,在开始启动之前需要先在bigdata02和bigdata03中生成测试数 据,为了模拟真实情况,在这里我们就开发一个脚本,定时向文件中写数据。

#!/bin/bash
# 循环向文件中生成数据
while [ "1" = "1" ]
do
# 获取当前时间戳
curr_time=`date +%s`
# 获取当前主机名
name=`hostname`
echo ${name}_${curr_time} >> /data/log/access.log
# 暂停1秒
sleep 1
done

1.首先在bigdata02上创建/data/log目录,然后创建 generateAccessLog.sh 脚本

2.接着在bigdata03上创建/data/log目录,然后创建 generateAccessLog.sh 脚本

3.接下来开始启动相关的服务进程 首先启动bigdata04上的agent服务

这里要注意下启动顺序

首先应该启动的是04机器、如果没有启动04就启动了02和03,会丢失一部分数据

  • 启动04
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file avro-to-hdfs.conf -Dflume.root.logger=INFO,console

  • 启动03
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file file-to-avro-104.conf -Dflume.root.logger=INFO,console

初始化测试数据

sh -x generateAccessLog.sh

  • 启动02
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file file-to-avro-104.conf -Dflume.root.logger=INFO,console

初始化测试数据

sh -x generateAccessLog.sh

验证数据结果

启动之后稍等一会就可以看到数据了,我们观察数据的变化,会发现hdfs中数据增长的不 是很快,它会每隔一段时间添加一批数据,实时性好像没那么高

注意

这是因为avrosink中有一个配置batch-size,它的默认值是100,也就是每次发送100条数据,如果数据 不够100条,则不发送。 具体这个值设置多少合适,要看你source数据源大致每秒产生多少数据,以及你希望的延迟要达到什么 程度,如果这个值设置太小的话,会造成sink频繁向外面写数据,这样也会影响性能。

实战结束

最终,依次停止bigdata02、bigdata03中的服务,最后停止bigdata04中的服务

目录
相关文章
|
29天前
bigdata-12-Flume核心组件
bigdata-12-Flume核心组件
24 0
|
29天前
|
存储 数据采集 JSON
bigdata-14-Flume高级组件
bigdata-14-Flume高级组件
25 1
|
29天前
|
消息中间件 存储 数据采集
bigdata-11-Flume入门与部署
bigdata-11-Flume入门与部署
22 0
|
8月前
|
SQL 分布式计算 监控
Flume学习--1、Flume概述、Flume入门、(一)
Flume学习--1、Flume概述、Flume入门、(一)
|
4月前
|
存储 分布式计算 监控
Flume(一)【Flume 概述】
Flume(一)【Flume 概述】
|
4月前
|
消息中间件 存储 分布式计算
Hadoop学习笔记(HDP)-Part.20 安装Flume
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
54 0
Hadoop学习笔记(HDP)-Part.20 安装Flume
|
8月前
|
JSON 监控 Unix
Flume学习--1、Flume概述、Flume入门、(二)
Flume学习--1、Flume概述、Flume入门、(二)
|
存储 缓存 分布式计算
入门Flume
你好看官,里面请!今天笔者讲的是入门Flume。不懂或者觉得我写的有问题可以在评论区留言,我看到会及时回复。 注意:本文仅用于学习参考,不可用于商业用途,如需转载请跟我联系。
239 1
|
分布式计算 监控 Java
Flume笔记
Flume笔记
101 0
Flume笔记
|
存储 消息中间件 数据采集
Flume基础
Flume是数据采集,日志收集的框架,通过分布式形式进行采集,(高可用分布式)
226 0
Flume基础