flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题

本文的背景:


在搜集日志的过程中,日志文件的个数及日志文件需要不断的追加。flume1.6中,可以使用tail -f可以解决不断追加的文件,但是由于日志文件的个数是变化的,不可能只产生一个文件。所以tail -f就已经不能解决这个搜集日志的问题。


需求:

需要能够监控不断增加的文件,并且单个文件也是不断追加的

解决办法:

这时候flume1.7就产生了,很好的通过 TAILDIRl解决了这个问题。TAILDIRl可以监控一个目录下的文件。


官网地址:http://flume.apache.org/FlumeUserGuide.html


官网文档截图:

34b7237c1c79302ec8f91fdc2c01543b.jpg

上面加粗为常用属性。


这里我们只使用了下面两个属性

a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*

a1.sources.source1.type = TAILDIR


flume1.7安装包

链接:http://pan.baidu.com/s/1c1Pzo9i 密码:fxa4




一、Flume安装


1. 压缩安装包

tar -zxvf ~/jar/apache-flume-1.7.0-bin.tar.gz -C /data
mv /data/apache-flume-1.7.0-bin/ /data/flume-1.7.0 # 重命名



2. 配置环境变量

echo -e "export FLUME_HOME=/data/flume-1.7.0\nexport PATH=\$FLUME_HOME/bin:\$PATH" >> ~/.bashrc
source ~/.bashrc

3. 配置flume

cp flume-env.sh.template flume-env.sh修改JAVA_HOME
export JAVA_HOME= /data/jdk1.8.0_111



4. 验证安装

flume-ng version


二、Flume使用


一个agent由source、channel、sink组成。这儿我们使用Spooling Directory Source、File Channel、Kafka Sink。


1. 单节点的agent

1) 增加配置文件

cd $FLUME_HOME/conf
vim single_agent.conf



将以下内容拷贝进去

# agent的名称为a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
# set source
#a1.sources.source1.type = spooldir
a1.sources.source1.type = TAILDIR
a1.sources.source1.filegroups = f1
a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*
#a1.sources.source1.spoolDir=/data/aboutyunlog
a1sources.source1.fileHeader = flase
# set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= aboutyunlog
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy
# set channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /data/flume_data/data
# bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1


2. 创建所需文件

mkdir -p /data/aboutyunlog
mkdir -p /data/flume_data/checkpoint
mkdir -p /data/flume_data/data



3. 查看kafka现有的topic

kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list


4. 在kafka上创建名为aboutyunlog的topic

kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3

5. 启动flume

flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console

启动过程中控制台会输出很多日志。


6. 创建一个kafka的consumer

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181  --topic aboutyunlog --from-beginning


这条命令的意思是说创建aboutyunlog这个topic下的消费者,消费时从最开始的一条信息开始消费。


上图说明该消费者创建成功,由于本地/data/aboutyunlog目录下没有新文件加入,造成aboutyunlog这个topic没有信息输入,所以消费者没有得到一条信息。

7.  添加文件到flume source目录

echo -e "this is a test file! \n[url]http://www.aboutyun.com20170820[/url]"
mv log.1 /data/aboutyunlog/

为:echo -e "this is a test file! \nhttp://www.aboutyun.com20170820">log.1

再次执行

echo -e "this is a test file! \n[url]http://www.aboutyun.com20170820[/url]">log.2

fe6a6677f1c6db529e56f0eaef2d26bb.jpg

然后我们看到


master上

14a1a14b194e10de92d9759164bc16b5.jpg

注意:需要通过xshell链接两个master。也就是打开两个master界面


8. 再次查看kafka consumer



切换到创建kafka consumer的shell界面,会看到我们log.1中文件的内容被打印在屏幕上。

bba7fbed3991211bc6d9b8c61a0f7729.jpg

上图说明我们已经成功使用flume监控/data/aboutyunlog目录,并将监控目录中的内容发送到kafka的aboutyunlog主题中。


注意:如果使用flume1.6会找不到类。

17/08/17 19:21:08 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: TAILDIR, class: TAILDIR
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: TAILDIR
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more

所以需更换flume1.7


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
5天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
105 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
5天前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的控制文件与归档日志文件
本文介绍了Oracle数据库中的控制文件和归档日志文件。控制文件记录了数据库的物理结构信息,如数据库名、数据文件和联机日志文件的位置等。为了保护数据库,通常会进行控制文件的多路复用。归档日志文件是联机重做日志文件的副本,用于记录数据库的变更历史。文章还提供了相关SQL语句,帮助查看和设置数据库的日志模式。
【赵渝强老师】Oracle的控制文件与归档日志文件
|
3天前
|
Windows Python
如何反向读取Windows系统日志EVTX文件?
以下是如何反向读取Windows系统日志EVTX文件
12 2
|
5天前
|
Oracle 关系型数据库 数据库
【赵渝强老师】Oracle的参数文件与告警日志文件
本文介绍了Oracle数据库的参数文件和告警日志文件。参数文件分为初始化参数文件(PFile)和服务器端参数文件(SPFile),在数据库启动时读取并分配资源。告警日志文件记录了数据库的重要活动、错误和警告信息,帮助诊断问题。文中还提供了相关视频讲解和示例代码。
|
1月前
|
监控 Linux 应用服务中间件
系统监控:使用日志文件 journalctl的使用
本文介绍了如何使用`journalctl`命令来监控和查看Linux系统的日志文件,包括查看特定行数、过滤日志级别、实时跟踪日志、按时间段查询日志以及日志轮换和压缩的配置。
40 2
系统监控:使用日志文件 journalctl的使用
|
1天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL的撤销日志文件和错误日志文件
本文介绍了MySQL的物理存储结构,重点讲解了InnoDB存储引擎中的撤销日志文件(undo log)和错误日志文件。从MySQL 8.0开始,默认生成两个10MB的undo表空间文件,并支持动态扩容和收缩。错误日志文件记录了MySQL启动、运行、关闭过程中的问题,通过示例展示了如何查看和使用这些日志。
|
1月前
|
SQL 数据库
为什么 SQL 日志文件很大,我应该如何处理?
为什么 SQL 日志文件很大,我应该如何处理?
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
43 2
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
42 1
|
1月前
|
SQL 数据库
为什么SQL日志文件很大,该如何处理?
为什么SQL日志文件很大,该如何处理?