很多同事认为filebeat采集日志不能做到多行处理,今天这里讨论下filebeat的multiline与include_lines。
先来个案例,以下日志,我们只要求采集error的字段,
1
2
3
|
2017
/06/22
11:26:30 [error] 26067
#0: *17918 connect() failed (111: Connection refused) while connecting to upstream, client: 192.168.32.17, server: localhost, request: "GET /wss/ HTTP/1.1", upstream: "http://192.168.12.106:8010/", host: "192.168.12.106"
2017
/06/22
11:26:30 [info] 26067
#0:
2017
/06/22
12:05:10 [error] 26067
#0: *17922 open() "/data/programs/nginx/html/ws" failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: "GET /ws HTTP/1.1", host: "192.168.12.106"
|
filebeat.yml文件配置如下:
1
2
3
4
5
6
7
8
9
|
filebeat.prospectors:
- input_type: log
paths:
-
/tmp/test
.log
include_lines: [
'error'
]
output.kafka:
enabled:
true
hosts: [
"192.168.12.105:9092"
]
topic: logstash-errors-log
|
查看下kafka队列
果然只有“error”关键字的日志被采集了
1
2
|
{
"@timestamp"
:
"2017-06-23T08:57:25.227Z"
,
"beat"
:{
"name"
:
"192.168.12.106"
},
"input_type"
:
"log"
,
"message"
:
"2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106"
,
"offset"
:30926,
"source"
:
"/tmp/test.log"
,
"type"
:
"log"
}
{
"@timestamp"
:
"2017-06-23T08:57:32.228Z"
,
"beat"
:{
"name"
:
"192.168.12.106"
},
"input_type"
:
"log"
,
"message"
:
"2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106"
,
"offset"
:31342,
"source"
:
"/tmp/test.log"
,
"type"
:
"log"
}
|
再来多行案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
[2016-05-25 12:39:04,744][DEBUG][action.bulk ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***],
source
[{***}}
MapperParsingException[Field name [events.created] cannot contain
'.'
]
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)
at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)
at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
|
filebeat.yml文件配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
filebeat.prospectors:
- input_type: log
paths:
-
/tmp/test
.log
multiline:
pattern:
'^\['
negate:
true
match: after
fields:
beat.name: 192.168.12.106
fields_under_root:
true
output.kafka:
enabled:
true
hosts: [
"192.168.12.105:9092"
]
topic: logstash-errors-log
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
kafka队列如下:
{
"@timestamp"
:
"2017-06-23T09:09:02.887Z"
,
"beat"
:{
"name"
:
"192.168.12.106"
},
"input_type"
:
"log"
,
"message"
:"[2016-05-25 12:39:04,744][DEBUG][action.bulk ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***],
source
[{***}}\n
MapperParsingException[Field name [events.created] cannot contain
'.'
]\n at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)\n
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)\n
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)\n
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)\n
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)\n
at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)\n
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)\n
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)\n
at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)\n
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)\n
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)\n
at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)\n
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)\n
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)\n
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)\n
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n
at java.lang.Thread.run(Thread.java:745)\n\n\n\n
","
offset
":35737,"
source
":"
/tmp/test
.log
","
type
":"
log"}
|
可以看出multiline将多行日志汇总。
multiline与include_lines,结合使用。
filebeat.yml文件配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
filebeat.prospectors:
- input_type: log
paths:
-
/tmp/test
.log
include_lines: [
'error'
]
multiline:
pattern:
'^\['
negate:
true
match: after
output.kafka:
enabled:
true
hosts: [
"192.168.12.105:9092"
]
topic: logstash-errors-log
|
即日志中如果有"error"关键字的日志,进行多行合并,发送至kafka.
经验证,在日志不断输入的情况,会把不含"error"的行也进行合并,日志有间隔的情况输入,过滤效果比较好,具体结合业务情况实用吧。
总之一句话,filebeat可以多行合并和进行关键字日志采集。
本文转自 jackjiaxiong 51CTO博客,原文链接:http://blog.51cto.com/xiangcun168/1941401