GeoIP 地址查询归类
GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
配置示例
filter { geoip { source => "message" } }
运行结果
{ "message" => "183.60.92.253", "@version" => "1", "@timestamp" => "2014-08-07T10:32:55.610Z", "host" => "raochenlindeMacBook-Air.local", "geoip" => { "ip" => "183.60.92.253", "country_code2" => "CN", "country_code3" => "CHN", "country_name" => "China", "continent_code" => "AS", "region_name" => "30", "city_name" => "Guangzhou", "latitude" => 23.11670000000001, "longitude" => 113.25, "timezone" => "Asia/Chongqing", "real_region_name" => "Guangdong", "location" => [ [0] 113.25, [1] 23.11670000000001 ] } }
配置说明
GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields
选项指定自己所需要的。下例为全部可选内容:
filter { geoip { fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"] } }
需要注意的是:geoip.location
是 logstash 通过 latitude
和 longitude
额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:
filter { geoip { fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"] remove_field => ["[geoip][latitude]", "[geoip][longitude]"] } } ```
小贴士
geoip 插件的 "source" 字段可以是任一处理后的字段,比如 "client_ip",但是字段内容却需要小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。
所以读者在测试时,如果使用了诸如 127.0.0.1, 172.16.0.1, 182.168.0.1, 10.0.0.1 等内网地址,会发现没有对应输出!
JSON 编解码
在上一章,已经讲过在 codec 中使用 JSON 编码。但是,有些日志可能是一种复合的数据结构,其中只是一部分记录是 JSON 格式的。这时候,我们依然需要在 filter 阶段,单独启用 JSON 解码插件。
配置示例
filter { json { source => "message" target => "jsoncontent" } }
运行结果
{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "jsoncontent": { "uid": 3081609001, "type": "signal" } }
小贴士
如果不打算使用多层结构的话,删掉 target
配置即可。新的结果如下:
{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "uid": 3081609001, "type": "signal" }
split 拆分事件
上一章我们通过 multiline 插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是 split 插件。
配置示例
filter { split { field => "message" terminator => "#" } }
运行结果
这个测试中,我们在 intputs/stdin 的终端中输入一行数据:"test1#test2",结果看到输出两个事件:
{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test1" } { "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test2" }
重要提示
split 插件中使用的是 yield 功能,其结果是 split 出来的新事件,会直接结束其在 filter 阶段的历程,也就是说写在 split 后面的其他 filter 插件都不起作用,进入到 output 阶段。所以,一定要保证 split 配置写在全部 filter 配置的最后。
使用了类似功能的还有 clone 插件。
注:从 logstash-1.5.0beta1 版本以后修复该问题。
UserAgent 匹配归类
配置示例
filter { useragent { target => "ua" source => "useragent" } }
Key-Value 切分
在很多情况下,日志内容本身都是一个类似于 key-value 的格式,但是格式具体的样式却是多种多样的。logstash 提供 filters/kv
插件,帮助处理不同样式的 key-value 日志,变成实际的 LogStash::Event 数据。
配置示例
filter { ruby { init => "@kname = ['method','uri','verb']" code => "event.append(Hash[@kname.zip(event['request'].split(' '))])" } if [uri] { ruby { init => "@kname = ['url_path','url_args']" code => "event.append(Hash[@kname.zip(event['uri'].split('?'))])" } kv { prefix => "url_" source => "url_args" field_split => "&" remove_field => [ "url_args", "uri", "request" ] } } }
解释
Nginx 访问日志中的 $request
,通过这段配置,可以详细切分成 method
, url_path
, verb
, url_a
, url_b
...
随心所欲的 Ruby 处理
如果你稍微懂那么一点点 Ruby 语法的话,filters/ruby 插件将会是一个非常有用的工具。
比如你需要稍微修改一下 LogStash::Event
对象,但是又不打算为此写一个完整的插件,用 filters/ruby 插件绝对感觉良好。
配置示例
filter { ruby { init => "@kname = ['client','servername','url','status','time','size','upstream','upstreamstatus','upstreamtime','referer','xff','useragent']" code => "event.append(Hash[@kname.zip(event['message'].split('|'))])" } }
官网示例是一个比较有趣但是没啥大用的做法 —— 随机取消 90% 的事件。
所以上面我们给出了一个有用而且强大的实例。
解释
通常我们都是用 filters/grok 插件来捕获字段的,但是正则耗费大量的 CPU 资源,很容易成为 Logstash 进程的瓶颈。
而实际上,很多流经 Logstash 的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。
filters/mutate 插件里的 "split" 选项只能切成数组,后续很不方便使用和识别。而在 filters/ruby 里,我们可以通过 "init" 参数预定义好由每个新字段的名字组成的数组,然后在 "code" 参数指定的 Ruby 语句里通过两个数组的 zip 操作生成一个哈希并添加进数组里。短短一行 Ruby 代码,可以减少 50% 以上的 CPU 使用率。
filters/ruby 插件用途远不止这一点,下一节你还会继续见到它的身影。
更多实例
2014 年 09 年 23 日新增
filter{ date { match => ["datetime" , "UNIX"] } ruby { code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs" } }
在实际运用中,我们几乎肯定会碰到出乎意料的输入数据。这都有可能导致 Elasticsearch 集群出现问题。
当数据格式发生变化,比如 UNIX 时间格式变成 UNIX_MS 时间格式,会导致 logstash 疯狂创建新索引,集群崩溃。
或者误输入过老的数据时,因为一般我们会 close 几天之前的索引以节省内存,必要时再打开。而直接尝试把数据写入被关闭的索引会导致内存问题。
这时候我们就需要提前校验数据的合法性。上面配置,就是用于过滤掉时间范围与当前时间差距太大的非法数据的。
数值统计(Metrics)
filters/metrics 插件是使用 Ruby 的 Metriks 模块来实现在内存里实时的计数和采样分析。该模块支持两个类型的数值分析:meter 和 timer。下面分别举例说明:
Meter 示例(速率阈值检测)
web 访问日志的异常状态码频率是运维人员会非常关心的一个数据。通常我们的做法,是通过 logstash 或者其他日志分析脚本,把计数发送到 rrdtool 或者 graphite 里面。然后再通过 check_graphite 脚本之类的东西来检查异常并报警。
事实上这个事情可以直接在 logstash 内部就完成。比如如果最近一分钟 504 请求的个数超过 100 个就报警:
filter { metrics { meter => "error.%{status}" add_tag => "metric" ignore_older_than => 10 } if "metric" in [tags] { ruby { code => "event.cancel if event['error.504.rate_1m'] * 60 < 100" } } } output { if "metric" in [tags] { exec { command => "echo \"Out of threshold: %{error.504.rate_1m}\"" } } }
这里需要注意 *60
的含义。
metriks 模块生成的 rate_1m/5m/15m 意思是:最近 1,5,15 分钟的每秒速率!
Timer 示例(box and whisker 异常检测)
官版的 filters/metrics 插件只适用于 metric 事件的检查。由插件生成的新事件内部不存有来自 input 区段的实际数据信息。所以,要完成我们的百分比分布箱体检测,需要首先对代码稍微做几行变动,即在 metric 的 timer 事件里加一个属性,存储最近一个实际事件的数值:github.com/chenryn/log…
然后我们就可以用如下配置来探测异常数据了:
filter { metrics { timer => {"rt" => "%{request_time}"} percentiles => [25, 75] add_tag => "percentile" } if "percentile" in [tags] { ruby { code => "l=event['rt.p75']-event['rt.p25'];event['rt.low']=event['rt.p25']-l;event['rt.high']=event['rt.p75']+l" } } } output { if "percentile" in [tags] and ([rt.last] > [rt.high] or [rt.last] < [rt.low]) { exec { command => "echo \"Anomaly: %{rt.last}\"" } } }