日志系统之Flume日志收集

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 最近接手维护一个日志系统,它用于对应用服务器上的日志进行收集然后提供实时分析、处理并最后将日志存储到目标存储引擎。针对这三个环节,业界已经有一套组件来应对各自的需求需求,它们是flume+kafka+hdfs/hbase。

最近接手维护一个日志系统,它用于对应用服务器上的日志进行收集然后提供实时分析、处理并最后将日志存储到目标存储引擎。针对这三个环节,业界已经有一套组件来应对各自的需求需求,它们是flume+kafka+hdfs/hbase。我们在实时分析、存储这两个环节,选择跟业界的实践相同,但agent是团队自己写的,出于对多种数据源的扩展需求以及原来收集日志的方式存在的一些不足,于是调研了一下flume的agent。结果是flume非常契合我们的实际需求,并且拥有良好的扩展性与稳定性。于是打算采用flume的agent替换我们原先的实现。

本文介绍我们如何使用flume agent以及为了满足我们的需求进行了哪些扩展。备注:全文所指的flume均指flume-ng,版本基于1.6.0。

flume简介

flume 通过Agent对各个服务器上的日志进行收集,它依赖三大核心组件,它们分别是:source,channel,sink。它们之间的串联关系如下图:


之间的关系也比较简单:source负责应对各种数据源进行日志收集;channel负责日志的中间暂存,将日志收集跟日志发送解耦;sink负责日志的发送,将日志发送到目的地。更详细的讲解,请移步官网。下面谈谈,我们对flume的使用与扩展。

Source的扩展

Flume提供了一个基于跟踪文件夹内“文件个数”变动的source称之为Spool Directory Source。它跟踪目标日志文件夹,当有新的日志文件产生时就会触发对新日志文件的收集,但它不支持日志文件的追加。也就是说一旦它开始收集某个日志文件,那么这个日志文件就不能再被编辑,如果在读取日志文件的时候,日志文件产生了变动那么它将会抛出异常。也就是说,当收集到当日日志文件时,同时又有新的日志在往里面写入时,该source是不适合这种需求的。

如果你的需求是接近“准实时”的日志收集并且你非要用这个souce,应对的方案是:你只能选择将应用程序的日志框架(比如常用的log4j)的appender的“滚动机制”设置为按分钟滚动(也就是每分钟产生一个新日志文件)。这种机制不是不可行,但有些不足的地方,比如日志文件过多:当日志除了要被日志系统收集,还需要本地保留时,这种机制将非常难以接受。

我们希望日志文件按天滚动产生新的日志文件,当天的日志以追加的方式写入当天的日志文件并且Agent还要能够以接近实时的速度收集新产生的日志(追加)的。如果agent挂掉或者服务器宕机,日志文件不能丢失,agent能够自动跨日期收集。其实,spooling directory source已经为我们的实现提供了模板,但要进行一些改造,主要是以下几点:

(1)原先的Spooling Directory Source不支持对收集的日志文件的内容进行追加:


如果文件有任何改动,将以异常的形式抛出。此处需要移除异常

(2)对当日日志文件进行持续监控

原先的实现,当获取不到event直接删除或者重命名当前文件,并自动混动到下一个文件:

/* It's possible that the last read took us just up to a file boundary.
     * If so, try to roll to the next file, if there is one. */
    if (events.isEmpty()) {
      retireCurrentFile();
      currentFile = getNextFile();
      if (!currentFile.isPresent()) {
        return Collections.emptyList();
      }
      events = currentFile.get().getDeserializer().readEvents(numEvents);
    }

修改后的实现,当当前文件不是当天的日志文件时才处理当前文件并自动滚动到下一个文件,如果是当日文件,则继续跟踪:

if(!isTargetFile(currentFile) 		//	Only CurrentFile is no longer the target, at the meanwhile, next file exists.
	    && (isExistNextFile()) ){	//	Then deal with the history file(ever target file)
  logger.info("File:{} is no longer a TARGET File, which will no longer be monitored.", currentFile.get().getFile().getName());
  retireCurrentFile();
  currentFile = getNextFile();
}

flume 该source的源码见: github

另外此处,我们判断是否是目标文件(当日日志文件)的处理方式是比对服务器日期跟文件名中包含的日期是否一致:

private boolean isTargetFile(Optional<FileInfo> currentFile2) {
		
  String inputFilename = currentFile2.get().getFile().getName();
  SimpleDateFormat dateFormat = new SimpleDateFormat(targetFilename);
  String substringOfTargetFile = dateFormat.format(new Date());
		
  if(inputFilename.toLowerCase().contains(substringOfTargetFile.toLowerCase())){
    return true;
  }
		
  return false;
}

所以在新的配置里还需要加入日期格式的配置,通常是:yyyy-MM-dd。

Sink 的扩展

Sink在Flume的agent组件中充当数据输出的作用。在flume之前的版本(1.5.2)中已经对多个数据持久化系统提供了内置支持(比如hdfs/HBase等),但默认是没有kafka的。如果我们想将日志消息发送到kafka,就需要自己扩展一个kafkaSink。后来通过搜索发现在最新的stable release版本:1.6.0中,官方已经集成了kafkaSink。不过1.6.0是5月20号刚刚发布,官方的Download页面以及User Guide还没有进行更新,所以请在版本列表页面下载1.6.0版本。在下载到的安装包内有最新的KafkaSink介绍。

核心的配置有:brokerList(为了高可用性,flume建议至少填写两个broker配置)、topic。详见列表:


出于好奇心,在github上大概浏览了官方实现kafkaSink的源码,发现Event的Header部分并没有被打包进消息发送走:

        byte[] eventBody = event.getBody();
        Map<String, String> headers = event.getHeaders();

        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
        }

        eventKey = headers.get(KEY_HDR);

        if (logger.isDebugEnabled()) {
          logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
            + new String(eventBody, "UTF-8"));
          logger.debug("event #{}", processedEvents);
        }

        // create a message and add to buffer
        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
        messageList.add(data);

这一点,可能并不满足我们的需求:我们需要消息头里的信息成为消息的一部分,然后在storm里针对header信息进行一些处理。比如:

(1)我们会默认在头里加入产生日志的服务器的Host,以便对日志进行分流或对没有存储host的日志进行“补偿”

(2)我们会默认在头里加入日志类型的标识,以便区分不同的日志并分流到不同的解析器进行解析

因为日志的来源以及形式是多样的,所以header里这些携带的信息是必要的。而flume官方的KafkaSink却过滤掉了header中的信息。因此,我们选择对其进行简单的扩张,将Event的header跟body打包成一个完整的json对象。具体的实现:

    private byte[] generateCompleteMsg(Map<String, String> header, byte[] body) {
        LogMsg msg = new LogMsg();
        msg.setHeader(header);
        msg.setBody(new String(body, Charset.forName("UTF-8")));

        String tmp = gson.toJson(msg, LogMsg.class);
        logger.info(" complete message is : " + tmp);
        return tmp.getBytes(Charset.forName("UTF-8"));
    }

                // create a message and add to buffer
                KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
                    (eventTopic, eventKey, generateCompleteMsg(headers, eventBody));
                messageList.add(data);

Interceptor使用

上面提到日志的源以及格式多种多样,我们不可能将所有工具、组件的日志格式按照我们想要的方式作格式化,特别是一些封闭的组件或线上的系统。很显然source跟sink只负责日志的收集和发送,并不会区分日志内容。而flume提供的Interceptor这一功能,给flume提供了更强大的扩展性。而我们拦击日志,并给其添加特定的header就是通过flume内置的几个interceptor实现的。我们应用了这么几个interceptor:

(1)host:往header中设置当前主机的Host信息;

(2)static:往header中设置一个预先配好的key-value对,我们用它来鉴别不同的日志源

(3)regex:通过将Event的body转换成一个UTF-8的字符串,然后匹配正则表达式,如果匹配成功,则可以选择放行或者选择删除

前两个interceptor我们之前已经提及过它的用途,而第三个我们用它来匹配日志中是否存在“DEGUG”字样的tag,如此存在,则删除该日志(这个是可选的)。

Selector 的使用

目前没有使用Selector的需求,不过它的用途也很常见:它可以用来选择Channel,如果你有多个Channel,并且是有条件得选择性发送的情况下,可以使用Selector来提高日志收集的灵活性。比如:如果你需要将不同不同日志源的日志发往不同的目的地可以建立多个channel然后按一定的规则来匹配,这里主要用到Multiplexing Channel Selector




原文发布时间为:2015-06-06


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
26天前
|
存储 前端开发 数据可视化
Grafana Loki,轻量级日志系统
本文介绍了基于Grafana、Loki和Alloy构建的轻量级日志系统。Loki是一个由Grafana Labs开发的日志聚合系统,具备高可用性和多租户支持,专注于日志而非指标,通过标签索引而非内容索引实现高效存储。Alloy则是用于收集和转发日志至Loki的强大工具。文章详细描述了系统的架构、组件及其工作流程,并提供了快速搭建指南,包括准备步骤、部署命令及验证方法。此外,还展示了如何使用Grafana查看日志,以及一些基本的LogQL查询示例。最后,作者探讨了Loki架构的独特之处,提出了“巨型单体模块化”的概念,即一个应用既可单体部署也可分布式部署,整体协同实现全部功能。
351 69
Grafana Loki,轻量级日志系统
|
1月前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
91 8
|
2月前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
121 2
|
3月前
|
存储 Linux Docker
centos系统清理docker日志文件
通过以上方法,可以有效清理和管理CentOS系统中的Docker日志文件,防止日志文件占用过多磁盘空间。选择合适的方法取决于具体的应用场景和需求,可以结合手动清理、logrotate和调整日志驱动等多种方式,确保系统的高效运行。
372 2
|
4月前
|
XML JSON 监控
告别简陋:Java日志系统的最佳实践
【10月更文挑战第19天】 在Java开发中,`System.out.println()` 是最基本的输出方法,但它在实际项目中往往被认为是不专业和不足够的。本文将探讨为什么在现代Java应用中应该避免使用 `System.out.println()`,并介绍几种更先进的日志解决方案。
107 1
|
4月前
|
监控 网络协议 安全
Linux系统日志管理
Linux系统日志管理
97 3
|
4月前
|
监控 应用服务中间件 网络安全
#637481#基于django和neo4j的日志分析系统
#637481#基于django和neo4j的日志分析系统
65 4
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
85 2
|
4月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
86 1
|
5月前
|
JSON 缓存 fastjson
一行日志引发的系统异常
本文记录了一行日志引发的系统异常以及作者解决问题的思路。
115 11

热门文章

最新文章