Filebeat 关键字多行匹配日志采集(multiline与include_lines)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介:

很多同事认为filebeat采集日志不能做到多行处理,今天这里讨论下filebeat的multiline与include_lines。

 

先来个案例,以下日志,我们只要求采集error的字段,

1
2
3
2017 /06/22  11:26:30 [error] 26067 #0: *17918 connect() failed (111: Connection refused) while connecting to upstream, client: 192.168.32.17, server: localhost, request: "GET /wss/ HTTP/1.1", upstream: "http://192.168.12.106:8010/", host: "192.168.12.106"
2017 /06/22  11:26:30 [info] 26067 #0:
2017 /06/22  12:05:10 [error] 26067 #0: *17922 open() "/data/programs/nginx/html/ws" failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: "GET /ws HTTP/1.1", host: "192.168.12.106"


filebeat.yml文件配置如下:

1
2
3
4
5
6
7
8
9
filebeat.prospectors:
- input_type: log
   paths:
     /tmp/test .log
   include_lines: [ 'error' ]
output.kafka:
   enabled:  true
   hosts: [ "192.168.12.105:9092" ]
   topic: logstash-errors-log

查看下kafka队列

果然只有“error”关键字的日志被采集了

1
2
{ "@timestamp" : "2017-06-23T08:57:25.227Z" , "beat" :{ "name" : "192.168.12.106" }, "input_type" : "log" , "message" : "2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106" , "offset" :30926, "source" : "/tmp/test.log" , "type" : "log" }
{ "@timestamp" : "2017-06-23T08:57:32.228Z" , "beat" :{ "name" : "192.168.12.106" }, "input_type" : "log" , "message" : "2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106" , "offset" :31342, "source" : "/tmp/test.log" , "type" : "log" }


再来多行案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[2016-05-25 12:39:04,744][DEBUG][action.bulk              ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***],  source [{***}}
MapperParsingException[Field name [events.created] cannot contain  '.' ]
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
     at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)
     at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)
     at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)
     at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)
     at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)
     at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)
     at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
     at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
     at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
     at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     at java.lang.Thread.run(Thread.java:745)

filebeat.yml文件配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filebeat.prospectors:
- input_type: log
   paths:
     /tmp/test .log
    multiline:
         pattern:  '^\['
         negate:   true
         match:   after
   fields:
     beat.name: 192.168.12.106
   fields_under_root:  true
output.kafka:
   enabled:  true
   hosts: [ "192.168.12.105:9092" ]
   topic: logstash-errors-log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kafka队列如下:
 
{ "@timestamp" : "2017-06-23T09:09:02.887Z" , "beat" :{ "name" : "192.168.12.106" }, "input_type" : "log" ,
"message" :"[2016-05-25 12:39:04,744][DEBUG][action.bulk              ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***],  source [{***}}\n
MapperParsingException[Field name [events.created] cannot contain  '.' ]\n    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)\n    
at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)\n    
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)\n    
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)\n    
at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)\n    
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)\n    
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)\n   
at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)\n    
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)\n    
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)\n    
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)\n   
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n    
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n    
at java.lang.Thread.run(Thread.java:745)\n\n\n\n "," offset ":35737," source ":" /tmp/test .log "," type ":" log"}

可以看出multiline将多行日志汇总。


multiline与include_lines,结合使用。

filebeat.yml文件配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
filebeat.prospectors:
- input_type: log
   paths:
     /tmp/test .log
   include_lines: [ 'error' ]
   multiline:
         pattern:  '^\['
         negate:   true
         match:   after
output.kafka:
   enabled:  true
   hosts: [ "192.168.12.105:9092" ]
   topic: logstash-errors-log

即日志中如果有"error"关键字的日志,进行多行合并,发送至kafka.

经验证,在日志不断输入的情况,会把不含"error"的行也进行合并,日志有间隔的情况输入,过滤效果比较好,具体结合业务情况实用吧。

总之一句话,filebeat可以多行合并和进行关键字日志采集。



本文转自 jackjiaxiong 51CTO博客,原文链接:http://blog.51cto.com/xiangcun168/1941401

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
监控 测试技术 开发者
一行代码改进:Logtail的多行日志采集性能提升7倍的奥秘
一个有趣的现象引起了作者的注意:当启用行首正则表达式处理多行日志时,采集性能出现下降。究竟是什么因素导致了这种现象?本文将探索Logtail多行日志采集性能提升的秘密。
170 23
|
4月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
82 1
|
5月前
|
Kubernetes API Docker
跟着iLogtail学习容器运行时与K8s下日志采集方案
iLogtail 作为开源可观测数据采集器,对 Kubernetes 环境下日志采集有着非常好的支持,本文跟随 iLogtail 的脚步,了解容器运行时与 K8s 下日志数据采集原理。
|
6月前
|
存储 Kubernetes Java
在k8S中,容器内日志是怎么采集的?
在k8S中,容器内日志是怎么采集的?
|
6月前
|
数据采集 监控 Kubernetes
Job类日志采集问题之iLogtail以减小容器发现和开始采集的延时如何优化
Job类日志采集问题之iLogtail以减小容器发现和开始采集的延时如何优化
|
6月前
|
数据采集 Kubernetes Java
Job类日志采集问题之在日志中添加容器的元信息标签,如何操作
Job类日志采集问题之在日志中添加容器的元信息标签,如何操作
|
6月前
|
存储 容器
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
|
6月前
|
容器
Job类日志采集问题之ECI产品采集方式对于弹性扩缩容是如何支持的
Job类日志采集问题之ECI产品采集方式对于弹性扩缩容是如何支持的
|
6月前
|
存储 数据采集 容器
Job类日志采集问题之DaemonSet采集方式在Job日志采集上如何表现
Job类日志采集问题之DaemonSet采集方式在Job日志采集上如何表现
|
6月前
|
存储 Kubernetes 数据处理
Job类日志采集问题之为什么Job容器的日志采集要考虑容器发现速度和开始采集延时,如何理解
Job类日志采集问题之为什么Job容器的日志采集要考虑容器发现速度和开始采集延时,如何理解

热门文章

最新文章