ELK技术栈 - logstash学习笔记(六)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。

GeoIP 地址查询归类


GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。


配置示例


filter {
    geoip {
        source => "message"
    }
}

运行结果


{
       "message" => "183.60.92.253",
      "@version" => "1",
    "@timestamp" => "2014-08-07T10:32:55.610Z",
          "host" => "raochenlindeMacBook-Air.local",
         "geoip" => {
                      "ip" => "183.60.92.253",
           "country_code2" => "CN",
           "country_code3" => "CHN",
            "country_name" => "China",
          "continent_code" => "AS",
             "region_name" => "30",
               "city_name" => "Guangzhou",
                "latitude" => 23.11670000000001,
               "longitude" => 113.25,
                "timezone" => "Asia/Chongqing",
        "real_region_name" => "Guangdong",
                "location" => [
            [0] 113.25,
            [1] 23.11670000000001
        ]
    }
}

配置说明


GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的。下例为全部可选内容:


filter {
    geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }
}

需要注意的是:geoip.location 是 logstash 通过 latitudelongitude 额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:


filter { geoip { fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"] remove_field => ["[geoip][latitude]", "[geoip][longitude]"] } } ```


小贴士


geoip 插件的 "source" 字段可以是任一处理后的字段,比如 "client_ip",但是字段内容却需要小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。


所以读者在测试时,如果使用了诸如 127.0.0.1, 172.16.0.1, 182.168.0.1, 10.0.0.1 等内网地址,会发现没有对应输出!


JSON 编解码


在上一章,已经讲过在 codec 中使用 JSON 编码。但是,有些日志可能是一种复合的数据结构,其中只是一部分记录是 JSON 格式的。这时候,我们依然需要在 filter 阶段,单独启用 JSON 解码插件。


配置示例


filter {
    json {
        source => "message"
        target => "jsoncontent"
    }
}

运行结果


{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "jsoncontent": {
        "uid": 3081609001,
        "type": "signal"
    }
}

小贴士


如果不打算使用多层结构的话,删掉 target 配置即可。新的结果如下:


{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "uid": 3081609001,
    "type": "signal"
}

split 拆分事件


上一章我们通过 multiline 插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是 split 插件。


配置示例


filter {
    split {
        field => "message"
        terminator => "#"
    }
}

运行结果


这个测试中,我们在 intputs/stdin 的终端中输入一行数据:"test1#test2",结果看到输出两个事件:


{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "test1"
}
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "test2"
}

重要提示


split 插件中使用的是 yield 功能,其结果是 split 出来的新事件,会直接结束其在 filter 阶段的历程,也就是说写在 split 后面的其他 filter 插件都不起作用,进入到 output 阶段。所以,一定要保证 split 配置写在全部 filter 配置的最后


使用了类似功能的还有 clone 插件。


注:从 logstash-1.5.0beta1 版本以后修复该问题。


UserAgent 匹配归类


配置示例


filter {
    useragent {
        target => "ua"
        source => "useragent"
    }
}

Key-Value 切分


在很多情况下,日志内容本身都是一个类似于 key-value 的格式,但是格式具体的样式却是多种多样的。logstash 提供 filters/kv 插件,帮助处理不同样式的 key-value 日志,变成实际的 LogStash::Event 数据。


配置示例


filter {
    ruby {
        init => "@kname = ['method','uri','verb']"
        code => "event.append(Hash[@kname.zip(event['request'].split(' '))])"
    }
    if [uri] {
        ruby {
            init => "@kname = ['url_path','url_args']"
            code => "event.append(Hash[@kname.zip(event['uri'].split('?'))])"
        }
        kv {
            prefix => "url_"
            source => "url_args"
            field_split => "&"
            remove_field => [ "url_args", "uri", "request" ]
        }
    }
}

解释


Nginx 访问日志中的 $request,通过这段配置,可以详细切分成 method, url_path, verb, url_a, url_b ...


随心所欲的 Ruby 处理


如果你稍微懂那么一点点 Ruby 语法的话,filters/ruby 插件将会是一个非常有用的工具。


比如你需要稍微修改一下 LogStash::Event 对象,但是又不打算为此写一个完整的插件,用 filters/ruby 插件绝对感觉良好。


配置示例


filter {
    ruby {
        init => "@kname = ['client','servername','url','status','time','size','upstream','upstreamstatus','upstreamtime','referer','xff','useragent']"
        code => "event.append(Hash[@kname.zip(event['message'].split('|'))])"
    }
}

官网示例是一个比较有趣但是没啥大用的做法 —— 随机取消 90% 的事件。


所以上面我们给出了一个有用而且强大的实例。


解释


通常我们都是用 filters/grok 插件来捕获字段的,但是正则耗费大量的 CPU 资源,很容易成为 Logstash 进程的瓶颈。


而实际上,很多流经 Logstash 的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。


filters/mutate 插件里的 "split" 选项只能切成数组,后续很不方便使用和识别。而在 filters/ruby 里,我们可以通过 "init" 参数预定义好由每个新字段的名字组成的数组,然后在 "code" 参数指定的 Ruby 语句里通过两个数组的 zip 操作生成一个哈希并添加进数组里。短短一行 Ruby 代码,可以减少 50% 以上的 CPU 使用率。


filters/ruby 插件用途远不止这一点,下一节你还会继续见到它的身影。


更多实例


2014 年 09 年 23 日新增


filter{
    date {
        match => ["datetime" , "UNIX"]
    }
    ruby {
        code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"
    }
}


在实际运用中,我们几乎肯定会碰到出乎意料的输入数据。这都有可能导致 Elasticsearch 集群出现问题。


当数据格式发生变化,比如 UNIX 时间格式变成 UNIX_MS 时间格式,会导致 logstash 疯狂创建新索引,集群崩溃。


或者误输入过老的数据时,因为一般我们会 close 几天之前的索引以节省内存,必要时再打开。而直接尝试把数据写入被关闭的索引会导致内存问题。


这时候我们就需要提前校验数据的合法性。上面配置,就是用于过滤掉时间范围与当前时间差距太大的非法数据的。


数值统计(Metrics)


filters/metrics 插件是使用 Ruby 的 Metriks 模块来实现在内存里实时的计数和采样分析。该模块支持两个类型的数值分析:meter 和 timer。下面分别举例说明:


Meter 示例(速率阈值检测)


web 访问日志的异常状态码频率是运维人员会非常关心的一个数据。通常我们的做法,是通过 logstash 或者其他日志分析脚本,把计数发送到 rrdtool 或者 graphite 里面。然后再通过 check_graphite 脚本之类的东西来检查异常并报警。


事实上这个事情可以直接在 logstash 内部就完成。比如如果最近一分钟 504 请求的个数超过 100 个就报警:


filter {
    metrics {
        meter => "error.%{status}"
        add_tag => "metric"
        ignore_older_than => 10
    }
    if "metric" in [tags] {
        ruby {
            code => "event.cancel if event['error.504.rate_1m'] * 60 < 100"
        }
    }
}
output {
    if "metric" in [tags] {
        exec {
            command => "echo \"Out of threshold: %{error.504.rate_1m}\""
        }
    }
}

这里需要注意 *60 的含义。


metriks 模块生成的 rate_1m/5m/15m 意思是:最近 1,5,15 分钟的每秒速率!


Timer 示例(box and whisker 异常检测)


官版的 filters/metrics 插件只适用于 metric 事件的检查。由插件生成的新事件内部不存有来自 input 区段的实际数据信息。所以,要完成我们的百分比分布箱体检测,需要首先对代码稍微做几行变动,即在 metric 的 timer 事件里加一个属性,存储最近一个实际事件的数值:github.com/chenryn/log…


然后我们就可以用如下配置来探测异常数据了:


filter {
    metrics {
        timer => {"rt" => "%{request_time}"}
        percentiles => [25, 75]
        add_tag => "percentile"
    }
    if "percentile" in [tags] {
        ruby {
            code => "l=event['rt.p75']-event['rt.p25'];event['rt.low']=event['rt.p25']-l;event['rt.high']=event['rt.p75']+l"
        }
    }
}
output {
    if "percentile" in [tags] and ([rt.last] > [rt.high] or [rt.last] < [rt.low]) {
        exec {
            command => "echo \"Anomaly: %{rt.last}\""
        }
    }
}

小贴士:有关 box and shisker plot 内容和重要性,参见《数据之魅》一书。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
5月前
|
消息中间件 存储 运维
一文吃透企业级elk技术栈:1. 基础优化
一文吃透企业级elk技术栈:1. 基础优化
|
5月前
|
监控
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
|
5月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:7. 验证结果
一文吃透企业级elk技术栈:7. 验证结果
|
2月前
|
存储 JSON Java
ELK 圣经:Elasticsearch、Logstash、Kibana 从入门到精通
ELK是一套强大的日志管理和分析工具,广泛应用于日志监控、故障排查、业务分析等场景。本文档将详细介绍ELK的各个组件及其配置方法,帮助读者从零开始掌握ELK的使用。
|
2月前
|
存储 监控 安全
|
5月前
|
消息中间件 Kafka 网络安全
一文吃透企业级elk技术栈:elk 各组件调试
调试需先理解逻辑与程序调用顺序。本文介绍filebeat、kafka、logstash和es的数据推送流程及调试方法:filebeat传输数据检查包括服务状态、配置与日志;kafka调试涵盖服务状态、端口与日志;logstash调试需检查配置文件、日志与流量;es直接通过kibana查看。还介绍了使用rsyslog接收防火墙/waf/交换机日志的方法。
一文吃透企业级elk技术栈:2. ES集群搭建
一文吃透企业级elk技术栈:2. ES集群搭建
|
5月前
|
监控 关系型数据库 MySQL
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:10. es数据生命周期管理
一文吃透企业级elk技术栈:10. es数据生命周期管理
|
5月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:6. filebeat安装配置
一文吃透企业级elk技术栈:6. filebeat安装配置