Flume监听文件夹中的文件变化,并把文件下沉到hdfs

简介: 1、采集目录到HDFS采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去 根据需求,首先定义以下3大要素 采集源,即source——监控文件目录 : spooldir 下沉目标,即sink——HDFS文件系统 : hdfs sink source和sink之间的传递通道——channel,可用file chann

1、采集目录到HDFS

采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去
根据需求,首先定义以下3大要素
采集源,即source——监控文件目录 : spooldir
下沉目标,即sink——HDFS文件系统 : hdfs sink
source和sink之间的传递通道——channel,可用file channel 也可以用内存channel

配置文件spooldir-hdfs.conf编写:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
## 通过spooldir来监控文件内容的变化
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/tuzq/software/flumedata
a1.sources.r1.fileHeader = true

# Describe the sink
## 表示下沉到hdfs,下面配置的类型不同,type下面的参数就不同
a1.sinks.k1.type = hdfs
#sinks.k1只能连接一个channel,source可以配置多个
a1.sinks.k1.channel = c1
#下面的配置告诉用hdfs去写文件的时候写到什么位置,下面的表示不是写死的,而是动态变化的。表示输出的目录名称是可变的
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
#表示文件的前缀
a1.sinks.k1.hdfs.filePrefix = events-
#表示到了需要触发的时间时,是否要更新文件夹,true:表示要更新
a1.sinks.k1.hdfs.round = true
##表示每隔1分钟改变一下文件夹
a1.sinks.k1.hdfs.roundValue = 1
##切换文件的时候单位是分钟
a1.sinks.k1.hdfs.roundUnit = minute
##表示只要过了3秒钟,就切换生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 3
##如果记录的文件大于20字节时切换一次
a1.sinks.k1.hdfs.rollSize = 20
##当写了5个事件时触发
a1.sinks.k1.hdfs.rollCount = 5
##收到了多少条消息往hdfs中追加内容
a1.sinks.k1.hdfs.batchSize = 1
#使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
##使用内存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Channel参数解释:
capacity:默认该通道中最大的可以存储的event数量
trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
keep-alive:event添加到通道中或者移出的允许时间

执行命令

[root@hadoop1 apache-flume-1.6.0-bin]#cd /home/tuzq/software/apache-flume-1.6.0-bin
[root@hadoop1 apache-flume-1.6.0-bin]#bin/flume-ng agent -c ./conf -f ./agentconf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console;

接着往/home/tuzq/software/flumedata文件夹中扔文件

[root@hadoop1 flumedata]# pwd
/home/tuzq/software/flumedata
[root@hadoop1 flumedata]# echo 111111111 >> 1.txt
[root@hadoop1 flumedata]# ls
1.txt.COMPLETED  test.log.COMPLETED
[root@hadoop1 flumedata]# echo 22222222 >> 2.txt
[root@hadoop1 flumedata]# echo 33333333 >> 3.txt
[root@hadoop1 flumedata]# echo 44444444 >> 4.txt
[root@hadoop1 flumedata]# ls
1.txt.COMPLETED  2.txt.COMPLETED  3.txt.COMPLETED  4.txt.COMPLETED  test.log.COMPLETED
[root@hadoop1 flumedata]#

扔了之后,现象是
1、/home/tuzq/software/flumedata文件文件夹下的文件倍加了一个一个后缀.COMPLETED,
2、在flume的监控位置,出现类似下图一样的文件:
3、到hdfs上查看文件:

[root@hadoop1 flumedata]# hdfs dfs -ls /
Found 5 items
drwxr-xr-x   - root supergroup          0 2017-06-13 12:01 /40000
drwxr-xr-x   - root supergroup          0 2017-06-13 23:43 /flume
-rw-r--r--   3 root supergroup       3719 2017-06-10 12:11 /kms.sh
drwxrwxrwx   - root supergroup          0 2017-06-10 22:06 /tmp
drwxr-xr-x   - root supergroup          0 2017-06-10 22:27 /user
[root@hadoop1 flumedata]# hdfs dfs -ls /flume
Found 2 items
drwxr-xr-x   - root supergroup          0 2017-06-13 23:43 /flume/events
drwxr-xr-x   - root supergroup          0 2017-06-13 22:01 /flume/tailout
[root@hadoop1 flumedata]# hdfs dfs -ls /flume/events
Found 1 items
drwxr-xr-x   - root supergroup          0 2017-06-13 23:47 /flume/events/17-06-13
[root@hadoop1 flumedata]# hdfs dfs -ls /flume/events/17-06-13
Found 3 items
drwxr-xr-x   - root supergroup          0 2017-06-13 23:43 /flume/events/17-06-13/2343
drwxr-xr-x   - root supergroup          0 2017-06-13 23:46 /flume/events/17-06-13/2346
drwxr-xr-x   - root supergroup          0 2017-06-13 23:47 /flume/events/17-06-13/2347
[root@hadoop1 flumedata]#

综上所述:说明通过flume已经把新增的文件下沉到了hdfs中。

目录
相关文章
|
2月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
10天前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL JSON 数据处理
实时计算 Flink版产品使用问题之把hdfs集群里的core-site.xml hdfs.xml两个文件放到flink/conf/目录下,启动集群说找不到hdfs,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
存储 分布式计算 NoSQL
|
1月前
|
分布式计算 Hadoop Java
分布式系统详解--框架(Hadoop--JAVA操作HDFS文件)
分布式系统详解--框架(Hadoop--JAVA操作HDFS文件)
27 0
|
2月前
|
SQL 分布式计算 监控
Flume实时读取本地/目录文件到HDFS
Flume实时读取本地/目录文件到HDFS
67 7
|
2月前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
11月前
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
2月前
|
SQL 数据采集 数据挖掘
nginx+flume网络流量日志实时数据分析实战
nginx+flume网络流量日志实时数据分析实战
137 0
|
11月前
|
消息中间件 数据采集 JSON
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(二)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(二)