《ELK Stack权威指南(第2版)》一2.3 过滤器配置

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云解析 DNS,旗舰版 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介:

本节书摘来自华章出版社《ELK Stack权威指南(第2版)》一书中的第2章,第2.3节,作者 饶琛琳  更多章节内容可以访问云栖社区“华章计算机”公众号查看。 


2.3 过滤器配置

有丰富的过滤器插件,是Logstash威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。下面我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有地添加新的Logstash事件到后续的流程中去!

2.3.1 date时间处理

之前章节已经提过,logstash-filter-date插件可以用来转换你的日志记录中的时间字符串,变成LogStash::Timestamp对象,然后转存到@timestamp字段里。

因为在稍后的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用logstash-filter-date转换后删除自己的字段!

这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。

强烈建议打开Nginx的access_log配置项的buffer参数,对极限响应性能有极大提升!

1.配置示例

logstash-filter-date插件支持五种时间格式:

ISO8601:类似“2011-04-19T03:44:01.103Z”这样的格式。具体Z后面可以有“08:00”也可以没有,“.103”这个也可以没有。常用场景里来说,Nginx的log_format配置里就可以使用$time_iso8601变量来记录请求时间成这种格式。

UNIX:UNIX时间戳格式,记录的是从1970年起始至今的总秒数。Squid默认日志格式中就使用了这种格式。

UNIX_MS:这个时间戳则是从1970年起始至今的总毫秒数。据我所知,JavaScript里经常使用这个时间格式。

TAI64N:TAI64N格式比较少见,是这个样子的:@4000000052f88ea32489532c。我目前只知道常见应用中,qmail会用这个格式。

Joda-Time库:Logstash内部使用了Java的Joda时间库来作时间处理。所以我们可以使用Joda库所支持的时间格式来作具体定义。Joda时间格式定义见表2-1。

表2-1 Joda时间库格式

格式符  含  义     描  述     示  例


2.时区问题的解释

很多中国用户经常提一个问题:为什么@timestamp比我们晚了8个小时?怎么修改成北京时间?

其实,Elasticsearch内部,对时间类型字段,是统一采用UTC时间,存成long长整形数据的!对日志统一采用UTC时间存储,是国际安全/运维界的一个通识—欧美公司的服务器普遍广泛分布在多个时区里—不像中国,地域横跨五个时区却只用北京时间。

对于页面查看,ELK的解决方案是在Kibana上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。

所以,建议大家接受这种设定。否则,即便你用.getLocalTime修改,也还要面临在Kibana过去修改,以及Elasticsearch原有的["now-1h" TO "now"]这种方便的搜索语句无法正常使用的尴尬。

以上,请读者自行斟酌。

2.3.2 grok正则捕获

grok是Logstash最重要的插件。你可以在grok里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。

1.正则表达式语法

运维工程师多多少少都会一点正则。你可以在grok里写标准的正则,像下面这样:

\s+(?<request_time>\d+(?:\.\d+)?)\s+

这个正则表达式写法对于Perl或者Ruby程序员应该很熟悉了,Python程序员可能更习惯写(?P<name>pattern),没办法,适应一下吧。

现在给我们的配置文件添加第一个过滤器区段配置。配置要添加在输入和输出区段之间(Logstash执行区段的时候并不依赖于次序,不过为了自己看得方便,还是按次序书写吧):


2. grok表达式语法

grok支持把预定义的grok表达式写入到文件中,官方提供的预定义grok表达式见:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

下面是从官方文件中摘抄的最简单但是足够说明用法的示例:

USERNAME [a-zA-Z0-9._-]+

USER %{USERNAME}

第一行,用普通的正则表达式来定义一个grok表达式;第二行,通过打印赋值格式(sprintf format),用前面定义好的grok表达式来定义另一个grok表达式。

grok表达式的打印复制格式的完整语法见下行示例。其中data_type目前只支持两个值:int和float。

%{PATTERN_NAME:capture_name:data_type}

所以我们可以改进我们的配置成下面这样:



3.最佳实践

实际运用中,我们需要处理各种各样的日志文件,如果你都是在配置文件里各自写一行自己的表达式,就完全不可管理了。所以,我们建议是把所有的grok表达式统一写入到一个地方。然后用filter/grok的patterns_dir选项来指明。

如果你把“message”里所有的信息都grok到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用remove_field参数来删除掉message字段,或者用overwrite参数来重写默认的message字段,只保留最重要的部分。

重写参数的示例如下:

filter {

    grok {

        patterns_dir =>["/path/to/your/own/patterns"]

        match => {

        "message" =>"%{SYSLOGBASE} %{DATA:message}"

        }

        overwrite => ["message"]

    }

}

更多有关grok正则性能的最佳实践(比如timeout_millis等配置参数),请参考:https://www.elastic.co/blog/do-you-grok-grok。

4.高级用法

多行匹配 在和codec/multiline搭配使用的时候,需要注意一个问题,grok正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要=~ // m一样也需要单独指定,具体写法是在表达式开始位置加(?m)标记。如下所示:

match => {

"message" =>"(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+"

}

多项选择 有时候我们会碰上一个日志有多种可能格式的情况。这时候要写成单一正则就比较困难,或者全用|隔开又比较丑陋。这时候,Logstash的语法提供给我们一个有趣的解决方式。

文档中,都说明logstash-filters-grok插件的match参数应该接受的是一个Hash值。但是因为早期的Logstash语法中Hash值也是用[]这种方式书写的,所以其实现在传递Array值给match参数也完全没问题。所以,我们这里其实可以传递多个正则来匹配同一个字段:

match => [

"message", "(?<request_time>\d+(?:\.\d+)?)",

"message", "%{SYSLOGBASE} %{DATA:message}",

"message", "(?m)%{WORD}"

]

Logstash会按照这个定义次序依次尝试匹配,到匹配成功为止。虽说效果跟用|分割写个大大的正则是一样的,但是可阅读性好了很多。

我强烈建议每个人都要使用Grok Debugger (http://grokdebug.herokuapp.com/)来调试自己的grok表达式。

2.3.3 dissect解析

grok作为Logstash最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数情况下日志格式并没有那么复杂,Logstash开发团队在5.0版新添加了另一个解析字段的插件:dissect。当日志格式有比较简明的分隔标志位且重复性较大的时候,可以使用dissect插件更快地完成解析工作。下面是解析syslog的示例。

filter {

    dissect {

        mapping => {

            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"

        }

        convert_datatype => {

            pid => "int"

        }

    }

}

我们看到上面使用了和Grok很类似的%{}语法来表示字段,这显然是基于习惯延续的考虑。不过示例中%{+ts}的加号就不一般了。dissect除了字段外面的字符串定位功能以外,还通过几个特殊符号来处理字段提取的规则:

%{+key} 这个+表示前面已经捕获到一个key字段了,而这次捕获的内容自动添补到之前key字段内容的后面。

%{+key/2} 这个/2表示在有多次捕获内容都填到key字段里的时候,拼接字符串的顺序谁前谁后。/2表示排第2位。

%{?string} 这个?表示这块只是一个占位,并不会实际生成捕获字段存到Event里面。

%{?string} %{&string} 当同样捕获名称都是string,但是一个?和一个&在一起的时候,表示这是一个键值对。

比如对http://rizhiyi.com/index.do?id=123写这么一段配置:

http://%{domain}/%{?url}?%{?arg1}=%{&arg1}

则最终生成的 Event 内容是这样的:

{

    domain => "rizhiyi.com",

    id => "123"

}

2.3.4 GeoIP地址查询

GeoIP是最常见的免费IP地址归类查询库,同时也有收费版可以采购。GeoIP库可以根据IP地址提供对应的地域信息,包括国别、省市、经纬度等,对于可视化地图和区域统计非常有用。

配置示例如下:



2.3.5 JSON编解码

在上一章,已经讲过在Codec中使用JSON编码。但是,有些日志可能是一种复合的数据结构,其中只有一部分记录是JSON格式的。这时候,我们依然需要在filter阶段,单独启用JSON解码插件。

配置示例如下:



2.3.6 key-value切分

在很多情况下,日志内容本身都是一个类似于key-value的格式,但是格式具体的样式却是多种多样的。Logstash提供logstash-filter-kv插件,帮助处理不同样式的key-value日志,变成实际的LogStash::Event数据。

配置示例如下:



2.3.7 metrics数值统计

logstash-filter-metrics插件是使用Ruby的Metriks模块来实现在内存里实时地计数和采样分析。该模块支持两个类型的数值分析:meter和timer。下面分别举例说明。

1. Meter示例(速率阈值检测)

Web访问日志的异常状态码频率是运维人员会非常关心的一个数据。通常我们的做法是通过Logstash或者其他日志分析脚本,把计数发送到rrdtool或者graphite里面,然后再通过check_graphite脚本之类的东西来检查异常并报警。

事实上这个事情可以直接在Logstash内部就完成。比如如果最近一分钟504请求的个数超过100个就报警,如下所示:



2. Timer示例(box and whisker异常检测)

官版的logstash-filter-metrics插件只适用于metric事件的检查。由插件生成的新事件内部不存有来自input区段的实际数据信息。所以,要完成我们的百分比分布箱体检测,需要首先对代码稍微做几行变动,即在metric的timer事件里加一个属性,存储最近一个实际事件的数值:



2.3.8 mutate数据修改

logstash-filter-mutate插件是Logstash另一个重要插件,它提供了丰富的基础类型数据处理能力,包括类型转换、字符串处理和字段处理等。

1.类型转换

类型转换是logstash-filter-mutate插件最初诞生时的唯一功能,其应用场景在之前的2.3.5节“JSON编解码”中已经提到。

可以设置的转换类型包括:“integer”、“float”和“string”。示例如下:

filter {

    mutate {

        convert => ["request_time", "float"]

    }

}

mutate除了转换简单的字符值,还支持对数组类型的字段进行转换,即将[“1”,“2”]转换成[1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的logstash-filter-ruby插件完成。

2. 字符串处理

有如下字符串处理的插件:


3.字段处理

字段处理的插件有:                           

rename:重命名某个字段,如果目的字段已经存在,会被覆盖掉,如下所示:

filter {

    mutate {

        rename => ["syslog_host", "host"]

    }

}

update:更新某个字段的内容。如果字段不存在,不会新建。

replace:作用和update类似,但是当字段不存在的时候,它会起到add_field参数一样的效果,自动添加新的字段。

4.执行次序

需要注意的是,filter/mutate内部是有执行次序的。其次序如下:



2.3.9 随心所欲的Ruby处理

如果你稍微懂那么一点点Ruby语法的话,logstash-filter-ruby插件将会是一个非常有用的工具。比如你需要稍微修改一下LogStash::Event对象,但是又不打算为此写一个完整的插件,用logstash-filter-ruby插件绝对感觉良好。

配置示例如下:

filter {

    ruby {

        init =>"@kname = ['client','servername','url','status','time','size','upstream',

            'upstreamstatus','upstreamtime','referer','xff','useragent']"

        code => "

            ew_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])

            new_event.remove('@timestamp')

        event.append(new_event)"

    }

}

官网示例是一个比较有趣但是没啥大用的做法—随机取消90%的事件。

所以上面我们给出了一个有用而且强大的实例。

通常我们都是用logstash-filter-grok插件来捕获字段的,但是正则耗费大量的CPU资源,很容易成为Logstash进程的瓶颈。

而实际上,很多流经Logstash的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。

从Logstash2.3开始,LogStash::event.append不再直接接受Hash对象,而必须是LogStash:: Event对象。所以示例变成要先初始化一个新的event,再把无用的@timestamp移除,再append进去。否则会把@timestamp变成有两个时间的数组了!

  从Logstash 5.0开始,LogStash::Event改为Java实现,直接使用event["parent"]["child"]形式获取的,不是原事件的引用而是复制品,需要改用event.get('[parent][child]')和event.set('[parent][child]', 'value')的方法。

logstash-filter-mutate插件里的“split”选项只能切成数组,后续很不方便使用和识别。而在logstash-filter-ruby里,我们可以通过“init”参数预定义好由每个新字段的名字组成的数组,然后在“code”参数指定的Ruby语句里通过两个数组的zip操作生成一个哈希并添加进数组里。短短一行Ruby代码,可以减少50%以上的CPU使用率。

logstash-filter-ruby插件用途远不止这一点,下一节你还会继续见到它的身影。

更多实例如下:

filter{

    date {

        match => ["datetime" , "UNIX"]

    }

    ruby {

        code =>"event.cancel if 5 * 24 * 3600 < (event.get( '@timestamp')-::Time.now).abs"

    }

}

在实际运用中,我们几乎肯定会碰到出乎意料的输入数据。这都有可能导致Elasticsearch集群出现问题。

当数据格式发生变化,比如UNIX时间格式变成UNIX_MS时间格式,会导致Logstash疯狂创建新索引,集群崩溃。

或者误输入过老的数据时,因为一般我们会close几天之前的索引以节省内存,必要时再打开。而直接尝试把数据写入被关闭的索引会导致内存问题。

这时候我们就需要提前校验数据的合法性。上面配置,就是用于过滤掉时间范围与当前时间差距太大的非法数据的。

2.3.10 split拆分事件

上一章我们通过multiline插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是split插件。

配置示例如下:

filter {

    split {

        field =>"message"

        terminator =>"#"

    }

}

这个测试中,我们在intputs/stdin的终端中输入一行数据:“test1#test2”,结果看到输出两个事件:



2.3.11 交叉日志合并

Splunk有一项非常有用的功能,叫做transaction。可以在错乱的多行日志中,根据connected字段、maxspan窗口、startswith/endwith标签等信息计算出事件的duration和count结果。其文档见:http://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Transaction

ELK中,承载计算功能的Elasticsearch并不支持这种跨行计算,所以,变通的处理方式是:在Logstash中,提前做好事件的归并,直接计算出来transaction的duration数据。

比如一个transaction task_id startswith=START endwith=END的查询,可以在Logstash中这样计算:




相关文章
|
6月前
|
存储 Prometheus 监控
Prometheus vs. ELK Stack:容器监控与日志管理工具的较量
随着容器化技术的广泛应用,容器监控与日志管理成为了关键任务。本文将对两种常用工具进行比较与选择,分别是Prometheus和ELK Stack。Prometheus是一款开源的监控系统,专注于时序数据的收集和告警。而ELK Stack则是一套完整的日志管理解决方案,由Elasticsearch、Logstash和Kibana三个组件组成。通过比较它们的特点、优势和适用场景,读者可以更好地了解如何选择适合自己需求的工具。
|
6月前
|
Go 数据处理 Docker
elk stack部署自动化日志收集分析平台
elk stack部署自动化日志收集分析平台
174 0
|
6月前
|
Prometheus 监控 Cloud Native
Prometheus VS ELK Stack:容器监控与日志管理工具的比较与选择
在容器化时代,有效的容器监控与日志管理工具对于确保应用程序的可靠性和可维护性至关重要。本文将比较两个主流工具,Prometheus和ELK Stack,探讨它们在容器监控和日志管理方面的特点、优势和适用场景,帮助读者做出明智的选择。
|
存储 数据可视化 Linux
分布式系列教程(42) -ELK配置与使用
分布式系列教程(42) -ELK配置与使用
344 0
|
存储 监控 安全
【Elastic Stack-初识篇】 ELK介绍、搭建最新 ELK 日志分析系统
【Elastic Stack-初识篇】 ELK介绍、搭建最新 ELK 日志分析系统
1033 0
|
消息中间件 缓存 负载均衡
【日志架构】ELK Stack + Kafka 端到端练习
【日志架构】ELK Stack + Kafka 端到端练习
|
消息中间件 监控 固态存储
带你读《Elastic Stack 实战手册》之71:——4.1.3.企业ELK日志搜索引擎
带你读《Elastic Stack 实战手册》之71:——4.1.3.企业ELK日志搜索引擎
121 0
BXA
|
Prometheus Kubernetes 监控
搭建高效微服务架构:Kubernetes、Prometheus和ELK Stack的完美组合
微服务架构是一种软件设计模式,它将单个应用程序拆分成一组更小、更独立的服务。每个服务在自己的进程中运行,并使用轻量级通信机制进行通信。由于每个服务都是独立的,因此可以独立部署、扩展和更新,从而使开发和运维更加容易。
BXA
429 0
|
消息中间件 数据采集 存储
客户端同学应该理解的 ELK Stack 组件知识
客户端同学应该理解的 ELK Stack 组件知识
193 0
客户端同学应该理解的 ELK Stack 组件知识
下一篇
无影云桌面