《ELK Stack权威指南 》第3章 场景示例

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:
本节书摘来自华章出版社《ELK Stack权威指南 》一书中的第1章,第3节,作者饶琛琳,更多章节内容可以访问云栖社区“华章计算机”公众号查看。


场 景 示 例

前面虽然介绍了几十个Logstash插件的常见配置项,但是过多的选择下,如何组合使用这些插件,依然是一部分用户的难题。本章将列举一些最常见的日志场景,演示针对性的组件搭配,希望能给读者带来启发。

本章介绍的场景包括:Nginx访问日志、Nginx错误日志、Postfix日志、Ossec日志、Windows系统日志、Java日志、MySQL慢查询日志、Docker容器日志。

3.1 Nginx访问日志

访问日志处理分析绝对是使用ELK stack时最常见的需求。默认的处理方式下,性能和精确度都不够好。本节会列举对Nginx访问日志的几种不同处理方式,并阐明其优劣。

3.1.1 grok处理方式

Logstash默认自带了Apache标准日志的grok正则表达式:

COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{NOTSPACE:auth}\[%{HTTPDATE:

    timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?

    |%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)

COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

对于Nginx标准日志格式,可以发现只是最后多了一个$http_x_forwarded_for变量。所以Nginx标准日志的grok正则定义是:

MAINNGINXLOG %{COMBINEDAPACHELOG} %{QS:x_forwarded_for}

自定义的日志格式,可以照此修改。

3.1.2 split处理方式

Nginx日志因为部分变量中内含空格,所以很多时候只能使用%{QS}正则来做分隔,性能和细度都不太好。如果能自定义一个比较少见的字符作为分隔符,那么处理起来就简单多了。假设定义的日志格式如下:

log_format main "$http_x_forwarded_for | $time_local | $request | $status |

    $body_bytes_sent | "

"$request_body | $content_length | $http_referer | $http_user_agent | $nuid | "

"$http_cookie | $remote_addr | $hostname | $upstream_addr | $upstream_response_

    time | $request_time";

实际日志如下:

117.136.9.248 | 08/Apr/2015:16:00:01 +0800 | POST /notice/newmessage?sign=cba4f614e05db285850cadc696fcdad0&token=JAGQ92Mjs3--gik_b_DsPIQHcyMKYGpD&did=b749736ac70f12df700b18cd6d051d5&osn=android&osv=4.0.4&appv=3.0.1&net=460-02-2g&longitude=120.393006&latitude=36.178329&ch=360&lp=1&ver=1&ts=1428479998151&im=869736012353958&sw=0&sh=0&la=zh-CN&lm=weixin&dt=vivoS11tHTTP/1.1| 200 | 132 | abcd-sign-v1://dd03c57f8cb6f1cef919ab5df66f2903f:d51asq5yslwnyz5t/{\x22type\x22:4,\x22uid\x22:7567306} | 89 | - | abcd/3.0.1, Android/4.0.4, vivo S11t | nuid=0C0A0A0A01E02455EA7CF47E02FD072C1428480001.157| - | 10.10.10.13 | bnx02.abcdprivate.com | 10.10.10.22:9999 | 0.022 | 0.022 59.50.44.53 | 08/Apr/2015:16:00:01 +0800 | POST /feed/pubList?appv=3.0.3&did=89da72550de488328e2aba5d97850e9f&dt=iPhone6%2C2&im=B48C21F3-487E-4071-9742-DC6D61710888&la=cn&latitude=0.000000&lm=weixin&longitude=0.000000&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=568.000000&sw=320.000000&token=7NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J&ts=1428480001275 HTTP/1.1 | 200 | 983 | abcd-sign-v1://b398870a0b25b29aae65cd553addc43d:72214ee85d7cca22/{\x22nextkey\x22:\x22\x22,\x22uid\x22:\x2213062545\x22,\x22token\x22:\x227NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J\x22}| 139 | - | Shopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00) | nuid=0C0A0A0A81-DF2455017D548502E48E2E1428480001.154 | nuid=CgoKDFUk34GFVH0BLo7kAg== | 10.10.10.11 | bnx02.abcdprivate.com | 10.10.10.35:9999 | 0.025 | 0.026

然后还可以针对request做更细致的切分。比如URL参数部分。很明显,URL参数中的字段顺序是乱的。第一行问号之后的第一个字段是sign,第二行问号之后的第一个字段是appv。所以需要将字段进行切分,取出每个字段对应的值。官方自带grok满足不了要求,最终采用的Logstash配置如下:

filter {

    ruby {

        init =>"@kname =['http_x_forwarded_for','time_local','request','status',

            'body_bytes_sent','request_body','content_length','http_referer','http_

            user_agent','nuid','http_cookie','remote_addr','hostname','upstream_

            addr','upstream_response_time','request_time']"

        code => "

            new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])

            new_event.remove('@timestamp')

            event.append(new_event)

        "

    }

    }

    if [request] {

        ruby {

            init =>"@kname = ['method','uri','verb']"

            code => "

                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])

                new_event.remove('@timestamp')

                event.append(new_event)

        "

    }

        }

        if [uri] {

            ruby {

                    init =>"@kname = ['url_path','url_args']"

            code => "

                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])

                new_event.remove('@timestamp')

                event.append(new_event)

        "

    }

            }

            kv {

                prefix =>"url_"

                source =>"url_args"

                field_split =>"&"

                remove_field => [ "url_args","uri","request" ]

            }

        }

    }

    mutate {

        convert => [

            "body_bytes_sent" , "integer",

            "content_length", "integer",

            "upstream_response_time", "float",

            "request_time", "float"

        ]

    }

    date {

        match => [ "time_local", "dd/MMM/yyyy:hh:mm:ss Z" ]

        locale =>"en"

    }

}

最终结果如下:

{

"message" =>"1.43.3.188 | 08/Apr/2015:16:00:01 +0800 | POST /search/sug

    gest?appv=3.0.3&did=dfd5629d705d400795f698055806f01d&dt=iPhone7%2C2&im=

    AC926907-27AA-4A10-9916-C5DC75F29399&la=cn&latitude=-33.903867&lm=

    sina&longitude=151.208137&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=66

    7.000000&sw=375.000000&token=_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO&ts=

    1428480001567 HTTP/1.1 | 200 | 353 | abcd-sign-v1://a24b478486d3bb92ed89a-

    901541b60a5:b23e9d2c14fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:

    \\x220\\x22,\\x22token\\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,

    \\x22limit\\x22:\\x2220\\x22} | 148 | - | abcdShopping/3.0.3 (iPhone; iOS

    8.1.3; Scale/2.00) | nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113

    | nuid=CgoKC1SvZJpkNHb5TpQwAg== | 10.10.10.11 | bnx02.abcdprivate.com |

    10.10.10.26:9999 | 0.070 | 0.071",

"@version" =>"1",

"@timestamp" =>"2015-04-08T08:00:01.000Z",

"type" =>"nginxapiaccess",

"host" =>"blog05.abcdprivate.com",

"path" =>"/home/nginx/logs/api.access.log",

"http_x_forwarded_for" =>"1.43.3.188",

"time_local" =>" 08/Apr/2015:16:00:01 +0800",

"status" =>"200",

"body_bytes_sent" => 353,

"request_body" =>"abcd-sign-v1://a24b478486d3bb92ed89a901541b60a5:b23e9d2c1

    4fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:\\x220\\x22,\\x22token

    \\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,\\x22limit\\x22:\\x2220\\x22}",

"content_length" => 148,

"http_referer" =>"-",

"http_user_agent" =>"abcdShopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00)",

"nuid" =>"nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113",

"http_cookie" =>"nuid=CgoKC1SvZJpkNHb5TpQwAg==",

"remote_addr" =>"10.10.10.11",

"hostname" =>"bnx02.abcdprivate.com",

"upstream_addr" =>"10.10.10.26:9999",

"upstream_response_time" => 0.070,

"request_time" => 0.071,

"method" =>"POST",

"verb" =>"HTTP/1.1",

"url_path" =>"/search/suggest",

"url_appv" =>"3.0.3",

"url_did" =>"dfd5629d705d400795f698055806f01d",

"url_dt" =>"iPhone7%2C2",

"url_im" =>"AC926907-27AA-4A10-9916-C5DC75F29399",

"url_la" =>"cn",

"url_latitude" =>"-33.903867",

"url_lm" =>"sina",

"url_longitude" =>"151.208137",

"url_lp" =>"-1.000000",

"url_net" =>"0-0-wifi",

"url_osn" =>"iOS",

"url_osv" =>"8.1.3",

"url_sh" =>"667.000000",

"url_sw" =>"375.000000",

"url_token" =>"_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO",

"url_ts" =>"1428480001567"

}

如果URL参数过多,可以不使用kv切分,或者预先定义成nested object后改成数组形式:

if [uri] {

    ruby {

        init =>"@kname = ['url_path','url_args']"

        code => "

                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])

                new_event.remove('@timestamp')

                event.append(new_event)

            "

        }

        if [url_args] {

            ruby {

                init => "@kname = ['key','value']"

                code => "event.set('nested_args', event.get('url_args').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"

            remove_field => [ "url_args","uri","request" ]

        }

    }

}

采用nested object的优化原理和nested object的使用方式,请阅读后面第11章中介绍Elasticsearch调优的内容。

3.1.3 JSON格式

自定义分隔符虽好,但是配置写起来毕竟复杂很多。其实对Logstash来说,Nginx日志还有另一种更简便的处理方式,就是自定义日志格式时,通过手工拼写直接输出成JSON 格式:

log_format json '{"@timestamp":"$time_iso8601",'

    '"host":"$server_addr",'

    '"clientip":"$remote_addr",'

    '"size":$body_bytes_sent,'

    '"responsetime":$request_time,'

    '"upstreamtime":"$upstream_response_time",'

    '"upstreamhost":"$upstream_addr",'

    '"http_host":"$host",'

    '"url":"$uri",'

    '"xff":"$http_x_forwarded_for",'

    '"referer":"$http_referer",'

    '"agent":"$http_user_agent",'

    '"status":"$status"}';

然后采用下面的Logstash配置即可:

input {

    file {

        path =>"/var/log/nginx/access.log"

        codec => json

    }

}

filter {

    mutate {

        split => [ "upstreamtime", "," ]

    }

    mutate {

        convert => [ "upstreamtime", "float" ]

    }

}

这里采用多个mutate插件,是因为upstreamtime可能有多个数值,所以先切割成数组以后,再分别转换成浮点型数值。而在mutate中,convert函数的执行优先级高于split函数,所以只能分开两步写。mutate内各函数的优先级顺序,之前2.3.8节有详细说明,读者可以返回去阅读。

3.1.4 syslog方式发送

Nginx从1.7版开始,加入了syslog支持,Tengine则更早。这样,我们可以通过syslog直接发送日志。Nginx上的配置如下:

access_log syslog:server=unix:/data0/rsyslog/nginx.sock locallog;

或者直接发送给远程Logstash机器:

access_log syslog:server=192.168.0.2:5140,facility=local6,tag=nginx-access,

    severity=info logstashlog;

默认情况下,Nginx将使用local7.info等级,以nginx为标签发送数据。注意,采用syslog发送日志的时候,无法配置buffer=16k选项。

3.2 Nginx错误日志

Nginx错误日志是运维人员最常见但又极其容易忽略的日志类型之一。本节介绍对Nginx错误日志的处理方式,并推荐读者在性能优化中对此多加关注。Nginx错误日志既没有统一明确的分隔符,也没有特别方便的正则模式,但通过Logstash不同插件的组合,还是可以轻松进行数据处理的。

值得注意的是,Nginx错误日志中有一类数据是接收过大请求体时的报错,默认信息会把请求体的具体字节数记录下来。每次请求的字节数基本都是在变化的,这意味着常用的topN等聚合函数对该字段没有明显效果。所以,对此需要做一下特殊处理。

最后形成的Logstash配置如下所示:

filter {

    grok {

        match => { "message" =>"(?<datetime>\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d)

            \[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+), (?<errinfo>.*)$" }

    }

    mutate {

        rename => [ "host", "fromhost" ]

        gsub => [ "errmsg", "too large body: \d+ bytes", "too large body" ]

    }

    if [errinfo]

    {

        ruby {

            code => "

                new_event = LogStash::Event.new(Hash[event.get('errinfo').split(', ').map{|l| l.split(': ')}])

                new_event.remove('@timestamp')

                event.append(new_event)""

            "

        }

    }

    grok {

        match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}(?:\?%{NGX_

            URIPARAM:urlparam})?(?: HTTP/%{NUMBER:httpversion})"' }

        patterns_dir =>["/etc/logstash/patterns"]

        remove_field => [ "message", "errinfo", "request" ]

    }

}

经过以上Logstash配置的Nginx错误日志生成的事件如下所示:

{

"@version": "1",

"@timestamp": "2015-07-02T01:26:40.000Z",

"type": "nginx-error",

"errtype": "error",

"errmsg": "client intended to send too large body",

"fromhost": "web033.mweibo.yf.sinanode.com",

"client": "36.16.7.17",

"server": "api.v5.weibo.cn",

"host": "\"api.weibo.cn\"",

"verb": "POST",

"urlpath": "/2/client/addlog_batch",

"urlparam": "gsid=_2A254UNaSDeTxGeRI7FMX9CrEyj2IHXVZRG1arDV6PUJbrdANLROskWp9b

    XakjUZM5792FW9A5S9EU4jxqQ..&wm=3333_2001&i=0c6f156&b=1&from=1053093010&c=

    iphone&v_p=21&skin=default&v_f=1&s=8f14e573&lang=zh_CN&ua=iPhone7,1__weibo__

    5.3.0__iphone__os8.3",

"httpversion": "1.1"

}

3.3 Postfix日志

Postfix是Linux平台上最常用的邮件服务器软件。邮件服务的运维复杂度一向较高,在此提供一个针对Postfix日志的解析处理方案。方案出自:https://github.com/whyscream/postfix-grok-patterns。

因为Postfix默认通过syslog方式输出日志,所以可以选择通过rsyslog直接转发给Logstash,也可以由Logstash读取rsyslog记录的文件。

Postfix会根据实际日志的不同,主动设置好不同的syslogtag,有anvil、bounce、cleanup、dnsblog、local、master、pickup、pipe、postdrop、postscreen、qmgr、scache、sendmail、smtp、lmtp、smtpd、tlsmgr、tlsproxy、trivial-rewrite和discard等20个不同的后缀,而在Logstash中,syslogtag通常被解析为program字段。本节以第一种anvil日志的处理配置作为示例:

input {

    syslog { }

}

filter {

    if [program] =~ /^postfix.*\/anvil$/ {

        grok {

            patterns_dir   =>["/etc/logstash/patterns.d"]

            match          => [ "message", "%{POSTFIX_ANVIL}" ]

            tag_on_failure => [ "_grok_postfix_anvil_nomatch" ]

            add_tag        => [ "_grok_postfix_success" ]

        }

    }

    mutate {

        convert => [

            "postfix_anvil_cache_size", "integer",

            "postfix_anvil_conn_count", "integer",

            "postfix_anvil_conn_rate", "integer",

        ]

    }

}

配置中使用了一个叫POSTFIX_ANVIL的自定义grok正则,该正则及其相关正则内容如下所示。将这段grok正则保存成文本文件,放入/etc/logstash/patterns.d/目录即可使用。

POSTFIX_TIME_UNIT %{NUMBER}[smhd]

POSTFIX_ANVIL_CONN_RATE statistics: max connection rate %{NUMBER:postfix_anvil_conn_

    rate}/%{POSTFIX_TIME_UNIT:postfix_anvil_conn_period} for \(%{DATA:postfix_

    service}:%{IP:postfix_client_ip}\) at %{SYSLOGTIMESTAMP:postfix_anvil_

    timestamp}

POSTFIX_ANVIL_CONN_CACHE statistics: max cache size %{NUMBER:postfix_anvil_

    cache_size} at %{SYSLOGTIMESTAMP:postfix_anvil_timestamp}

POSTFIX_ANVIL_CONN_COUNT statistics: max connection count %{NUMBER:postfix_

    anvil_conn_count} for \(%{DATA:postfix_service}:%{IP:postfix_client_ip}\) at

    %{SYSLOGTIMESTAMP:postfix_anvil_timestamp}

POSTFIX_ANVIL %{POSTFIX_ANVIL_CONN_RATE}|%{POSTFIX_ANVIL_CONN_CACHE}|%{POSTFIX_

    ANVIL_CONN_COUNT}

其余19种Postfix日志的完整grok正则和Logstash过滤配置,读者可以通过https://github.com/whyscream/postfix-grok-patterns获取。

3.4 Ossec日志

Ossec是一款开源的多平台入侵检测系统。将Ossec的监测报警信息转发到ELK中,无疑可以极大地帮助我们快速可视化安全事件。本节介绍Ossec与Logstash的结合方式。

3.4.1 配置所有Ossec agent采用syslog输出

配置步骤如下:

1)编辑ossec.conf文件(默认为/var/ossec/etc/ossec.conf)。

2)在ossec.conf中添加下列内容(10.0.0.1为接收syslog的服务器):

<syslog_output>

<server>10.0.0.1</server>

<port>9000</port>

<format>default</format>

</syslog_output>

3)开启Ossec允许syslog输出功能:

/var/ossec/bin/ossec-control enable client-syslog

4)重启Ossec服务:

/var/ossec/bin/ossec-control start

3.4.2 配置Logstash

在Logstash配置文件中增加(或新建)如下内容(假设10.0.0.1为Elasticsearch服务器):

input {

    udp {

        port => 9000

        type =>"syslog"

    }

}

filter {

    if [type] == "syslog" {

        grok {

            match => { "message" =>"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:

                syslog_host} %{DATA:syslog_program}: Alert Level: %{BASE10NUM:

                Alert_Level}; Rule: %{BASE10NUM:Rule} - %{GREEDYDATA:Description};

                Location: %{GREEDYDATA:Details}" }

            add_field => [ "ossec_server", "%{host}" ]

        }

        mutate {

            remove_field => [ "syslog_hostname", "syslog_message", "syslog_pid",

                "message", "@version", "type", "host" ]

        }

    }

}

output {

    elasticsearch {

    }

}

3.4.3 推荐Kibana仪表盘

社区已经有人根据Ossec的常见需求制作了仪表盘,可以直接从Kibana 3页面加载使用,示例如图3-1所示。

仪表盘的JSON文件见:https://github.com/magenx/Logstash/raw/master/kibana/kibana_dash-board.json。

加载方式请阅读本书第三部分介绍的Kibana相关内容。

3.5 Windows系统日志

Logstash社区有众多的Windows用户,本节单独介绍一下对Windows平台系统日志的收集处理。之前介绍过Linux上的系统日志,即syslog的处理。事实上,对于Windows平台,也有类似syslog的设计,叫eventlog。本节介绍如何处理Windows eventlog。

3.5.1 采集端配置

由于Logstash作者出身Linux运维,早期版本中出了不少Windows平台上独有的bug。所以,目前对Windows上的日志,推荐大家在尝试Logstash的同时,也可以试用更稳定的nxlog软件。nxlog更详细的介绍,请阅读本书后面5.5节。

这里先介绍Logstash和nxlog在处理Windows的eventlog时的配置方法。

Logstash配置如下:

 

图3-1 Ossec仪表盘

input {

    eventlog {

        #logfile =>  ["Application", "Security", "System"]

        logfile =>  ["Security"]

        type =>"winevent"

        tags => [ "caen" ]

    }

}

nxlog配置中有如下几个要点:

1)ROOT位置必须是nxlog的实际安装路径。

2)输入模块,在Windows 2003及之前版本上,不叫im_msvistalog而叫im_mseventlog。

下面是一段完整的nxlog配置示例:

define ROOT C:\Program Files (x86)\nxlog

Moduledir %ROOT%\modules

CacheDir %ROOT%\data

Pidfile %ROOT%\data\nxlog.pid

SpoolDir %ROOT%\data

LogFile %ROOT%\data\nxlog.log

<Extension json>

    Module  xm_json

</Extension>

<Input in>

    Module  im_msvistalog

    Exec    to_json();

</Input>

<Output out>

    Module  om_tcp

    Host    10.66.66.66

    Port    5140

</Output>

<Route 1>

    Path    in => out

</Route>

3.5.2 接收解析端配置

在中心的接收端,统一采用Logstash来完成解析入库操作。如果采集端也是Logstash,主要字段都已经生成,接收端配置也就没什么特别的了。如果采集端是nxlog,那么我们还需要把一些nxlog生成的字段转换成Logstash更通用的风格设计。

在之前插件介绍章节我们已经讲过,因为在Elasticsearch中默认按小写来检索,所以需要尽量把数据小写化。不巧的是,nxlog中,不单数据内容,字段名称也是大小写混用的,所以,我们只能通过logstash-filter-mutate的rename功能来完成对字段名称的小写化重命名。

配置示例如下:

input {

    tcp {

        codec =>"json"

        port => 5140

        tags => ["windows","nxlog"]

        type =>"nxlog-json"

    }

} # end input

 

filter {

    if [type] == "nxlog-json" {

        date {

            match => ["[EventTime]", "YYYY-MM-dd HH:mm:ss"]

            timezone =>"Europe/London"

        }

        mutate {

            rename => [ "AccountName", "user" ]

            rename => [ "AccountType", "[eventlog][account_type]" ]

            rename => [ "ActivityId", "[eventlog][activity_id]" ]

            rename => [ "Address", "ip6" ]

            rename => [ "ApplicationPath", "[eventlog][application_path]" ]

            rename => [ "AuthenticationPackageName", "[eventlog][authentication_package_name]" ]

            rename => [ "Category", "[eventlog][category]" ]

            rename => [ "Channel", "[eventlog][channel]" ]

            rename => [ "Domain", "domain" ]

            rename => [ "EventID", "[eventlog][event_id]" ]

            rename => [ "EventType", "[eventlog][event_type]" ]

            rename => [ "File", "[eventlog][file_path]" ]

            rename => [ "Guid", "[eventlog][guid]" ]

            rename => [ "Hostname", "hostname" ]

            rename => [ "Interface", "[eventlog][interface]" ]

            rename => [ "InterfaceGuid", "[eventlog][interface_guid]" ]

            rename => [ "InterfaceName", "[eventlog][interface_name]" ]

            rename => [ "IpAddress", "ip" ]

            rename => [ "IpPort", "port" ]

            rename => [ "Key", "[eventlog][key]" ]

            rename => [ "LogonGuid", "[eventlog][logon_guid]" ]

            rename => [ "Message", "message" ]

            rename => [ "ModifyingUser", "[eventlog][modifying_user]" ]

            rename => [ "NewProfile", "[eventlog][new_profile]" ]

            rename => [ "OldProfile", "[eventlog][old_profile]" ]

            rename => [ "Port", "port" ]

            rename => [ "PrivilegeList", "[eventlog][privilege_list]" ]

            rename => [ "ProcessID", "pid" ]

            rename => [ "ProcessName", "[eventlog][process_name]" ]

            rename => [ "ProviderGuid", "[eventlog][provider_guid]" ]

            rename => [ "ReasonCode", "[eventlog][reason_code]" ]

            rename => [ "RecordNumber", "[eventlog][record_number]" ]

            rename => [ "ScenarioId", "[eventlog][scenario_id]" ]

            rename => [ "Severity", "level" ]

            rename => [ "SeverityValue", "[eventlog][severity_code]" ]

            rename => [ "SourceModuleName", "nxlog_input" ]

            rename => [ "SourceName", "[eventlog][program]" ]

            rename => [ "SubjectDomainName", "[eventlog][subject_domain_name]" ]

            rename => [ "SubjectLogonId", "[eventlog][subject_logonid]" ]

            rename => [ "SubjectUserName", "[eventlog][subject_user_name]" ]

            rename => [ "SubjectUserSid", "[eventlog][subject_user_sid]" ]

            rename => [ "System", "[eventlog][system]" ]

            rename => [ "TargetDomainName", "[eventlog][target_domain_name]" ]

            rename => [ "TargetLogonId", "[eventlog][target_logonid]" ]

            rename => [ "TargetUserName", "[eventlog][target_user_name]" ]

            rename => [ "TargetUserSid", "[eventlog][target_user_sid]" ]

            rename => [ "ThreadID", "thread" ]

        }

        mutate {

            remove_field => [

                "CurrentOrNextState","Description","EventReceivedTime","EventTime","EventTimeWritten","IPVersion","KeyLength","Keywords","LmPackageName","LogonProcessName","LogonType","Name","O-pcode","OpcodeValue","PolicyProcessingMode","Protocol","Prot-ocolType","SourceModuleType","State","Task","TransmittedSe-  rvices","Type","UserID","Version"

            ]

        }

    }

}

3.6 Java日志

之前在2.2节有关codec的介绍中曾经提到过,对Java日志,除了使用multiline做多行日志合并以外,还可以直接通过Log4J写入logstash里。本节就讲述如何在Java应用环境做到这点。

3.6.1 Log4J配置

首先,需要配置Java应用的Log4J设置,启动一个内置的SocketAppender。修改应用的log4j.xml配置文件,添加如下配置段:

<appender name="LOGSTASH" class="org.apache.log4j.net.SocketAppender">

<param name="RemoteHost" value="logstash_hostname" />

<param name="ReconnectionDelay" value="60000" />

<param name="LocationInfo" value="true" />

<param name="Threshold" value="DEBUG" />

</appender>

然后把这个新定义的appender对象加入root logger里,可以跟其他已有logger共存:

<root>

<level value="INFO"/>

<appender-ref ref="OTHERPLACE"/>

<appender-ref ref="LOGSTASH"/>

</root>

如果是log4j.properties配置文件,则对应配置如下:

log4j.rootLogger=DEBUG, logstash

 

###SocketAppender###

log4j.appender.logstash=org.apache.log4j.net.SocketAppender

log4j.appender.logstash.Port=4560

log4j.appender.logstash.RemoteHost=logstash_hostname

log4j.appender.logstash.ReconnectionDelay=60000

log4j.appender.logstash.LocationInfo=true

Log4J会持续尝试连接你配置的logstash_hostname这个地址,建立连接后,即开始发送日志数据。

3.6.2 Logstash配置

Java应用端的配置完成以后,开始设置Logstash的接收端。配置如下所示,其中4560端口是Log4J SocketAppender的默认对端端口:

input {

    log4j {

        type =>"log4j-json"

        port => 4560

    }

}

3.6.3 异常堆栈测试验证

运行Logstash后,编写一个简单的Log4J程序:

import org.apache.log4j.Logger;

public class HelloExample{

    final static Logger logger = Logger.getLogger(HelloExample.class);

    public static void main(String[] args) {

        HelloExample obj = new HelloExample();

        try{

            obj.divide();

        }catch(ArithmeticException ex){

            logger.error("Sorry, something wrong!", ex);

        }

    }

    private void divide(){

        int i = 10 /0;

    }

}

编译运行:

# javac -cp ./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-

    log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample.java

# java -cp .:./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-

    log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample

这样即可在Logstash的终端输出看到如下事件记录:

{

"message" =>"Sorry, something wrong!",

"@version" =>"1",

"@timestamp" =>"2015-07-02T13:24:45.727Z",

"type" =>"log4j-json",

"host" =>"127.0.0.1:52420",

"path" =>"HelloExample",

"priority" =>"ERROR",

"logger_name" =>"HelloExample",

"thread" =>"main",

"class" =>"HelloExample",

"file" =>"HelloExample.java:9",

"method" =>"main",

"stack_trace" =>"java.lang.ArithmeticException: / by zero\n\tat HelloExample.

    divide(HelloExample.java:13)\n\tat HelloExample.main(HelloExample.java:7)"

}

可以看到,异常堆栈直接记录在单行内了。

3.6.4 JSON Event layout

如果无法采用SocketAppender,必须使用文件方式的,其实Log4J有一个layout特性,用来控制日志输出的格式。和Nginx日志自己拼接JSON输出类似,也可以通过layout功能记录成JSON格式。

Logstash官方提供了扩展包,可以通过mvnrepository.com搜索下载:

# wget http://central.maven.org/maven2/net/logstash/log4j/jsonevent-layout/1.7/

    jsonevent-layout-1.7.jar

或者直接编辑自己项目的pom.xml添加依赖:

<dependency>

    <groupId>net.logstash.log4j</groupId>

    <artifactId>jsonevent-layout</artifactId>

    <version>1.7</version>

</dependency>

然后修改项目的log4j.properties文件如下:

log4j.rootCategory=WARN, RollingLog

log4j.appender.RollingLog=org.apache.log4j.DailyRollingFileAppender

log4j.appender.RollingLog.Threshold=TRACE

log4j.appender.RollingLog.File=api.log

log4j.appender.RollingLog.DatePattern=.yyyy-MM-dd

log4j.appender.RollingLog.layout=net.logstash.log4j.JSONEventLayoutV1

如果是log4j.xml,则修改如下:

<appender name="Console" class="org.apache.log4j.ConsoleAppender">

    <param name="Threshold" value="TRACE" />

    <layout class="net.logstash.log4j.JSONEventLayoutV1" />

</appender>

生成的文件就是符合Logstash标准的JSON格式了,Logstash使用下面配置读取:

input {

    file {

        codec => json

        path => ["/path/to/log4j.log"]

    }

}

生成的Logstash事件如下:

{

    "mdc":{},

    "line_number":"29",

    "class":"org.eclipse.jetty.examples.logging.EchoFormServlet",

    "@version":1,

    "source_host":"jvstratusmbp.local",

    "thread_name":"qtp513694835-14",

    "message":"Got request from 0:0:0:0:0:0:0:1%0 using Mozilla\/5.0 (Macintosh;Intel Mac OS X 10_9_1) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/32.0.1700.77 Safari\/537.36",

    "@timestamp":"2014-01-27T19:52:35.738Z",

    "level":"INFO",

    "file":"EchoFormServlet.java",

    "method":"doPost",

    "logger_name":"org.eclipse.jetty.examples.logging.EchoFormServlet"

}

可以看到,同样达到了效果。

如果你使用的不是Log4J而是logback项目来记录Java日志,Logstash官方也有类似的扩展包,在pom.xml中改成如下定义即可:

<dependency>

    <groupId>net.logstash.logback</groupId>

    <artifactId>logstash-logback-encoder</artifactId>

    <version>4.4</version>

</dependency>

3.7 MySQL慢查询日志

MySQL有多种日志可以记录,常见的有error log、slow log、general log、bin log等。其中slow log作为性能监控和优化的入手点,最为首要。本节即讨论如何用Logstash处理slow log。至于general log,格式处理基本类似,不过由于general量级比slow大得多,推荐采用packetbeat协议解析的方式更高效地完成这项工作,相关内容阅读本书稍后8.3节。

MySQL slow log 的Logstash处理配置示例如下:

input {

    file {

        type =>"mysql-slow"

        path =>"/var/log/mysql/mysql-slow.log"

        codec => multiline {

            pattern =>"^# User@Host:"

            negate => true

            what =>"previous"

        }

    }

}

 

filter {

    # drop sleep events

    grok {

        match => { "message" =>"SELECT SLEEP" }

        add_tag => [ "sleep_drop" ]

        tag_on_failure => [] # prevent default _grokparsefailure tag on real records

    }

    if "sleep_drop" in [tags] {

        drop {}

    }

    grok {

        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clien-thost>\S*) )?\[(?:%{IP:clientip})?\]\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n# Time:.*$" ]

    }

    date {

        match => [ "timestamp", "UNIX" ]

        remove_field => [ "timestamp" ]

    }

}

配置中,利用了grok插件的add_tag选项仅在成功时添加,而tag_on_failure选项仅在失败时添加的互斥特性,巧妙地过滤出日志中无用的sleep语句删除掉。

如下一段多行的MySQL slow log:

# User@Host: logstash[logstash] @ localhost [127.0.0.1]

# Query_time: 5.310431  Lock_time: 0.029219 Rows_sent: 1  Rows_examined: 24575727

SET timestamp=1393963146;

select count(*) from node join variable order by rand();

# Time: 140304 19:59:14

通过运行上面的配置,Logstash即可处理成如下单个事件:

{

"@timestamp" =>"2014-03-04T19:59:06.000Z",

"message" =>"# User@Host: logstash[logstash] @ localhost [127.0.0.1]\n# Query_

    time: 5.310431  Lock_time: 0.029219 Rows_sent: 1  Rows_examined: 24575727\nSET

    timestamp=1393963146;\nselect count(*) from node join variable order by rand();

    \n# Time: 140304 19:59:14",

"@version" =>"1",

"tags" => [

        [0] "multiline"

    ],

"type" =>"mysql-slow",

"host" =>"raochenlindeMacBook-Air.local",

"path" =>"/var/log/mysql/mysql-slow.log",

"user" =>"logstash",

"clienthost" =>"localhost",

"clientip" =>"127.0.0.1",

"query_time" => 5.310431,

"lock_time" => 0.029219,

"rows_sent" => 1,

"rows_examined" => 24575727,

"query" =>"select count(*) from node join variable order by rand();",

"action" =>"select"

}

后续即可针对其中的action、query_time、lock_time和rows_examined字段做监控报警及Kibana可视化统计了。

3.8 Docker日志

Docker是目前大规模互联网基础架构解决方案中最热门的技术。它带给运维工程师一个截然不同的思考角度和工作方式。

就日志层面看,Docker最大的影响在于:其最佳实践要求一个容器内部只有一个生命周期随时可以消亡的服务进程。这也就意味着:传统的写入磁盘,固定采集方式的日志系统,无法正常发挥作用。所以,在容器服务中,记录日志需要采用另外的方式。本节将介绍其中最常见的两种:记录到主机磁盘,或通过logspout收集。

3.8.1 记录到主机磁盘

默认情况下,Docker会将容器的标准输出和错误输出,保存在主机的/var/lib/docker/containers/目录下。所以,在规模比较稳定的情况下,直接记录到主机磁盘,然后通过主机上的Logstash收集日志,也是不错的方案。

以Nginx为例,将Nginx访问日志和错误日志输出到标准输出的配置如下:

daemon off;

error_log /dev/stdout info;

http {

    access_log /dev/stdout;

    ...

}

不过,容器的特殊性在这里又一次体现出来,容器中其实是没有/dev/stdout设备的。所以我们需要自己单独处理一下,在Dockerflie里加上一句:

RUN ln -sf /proc/self/fd /dev/

这样,既保证了nginx.conf是主机和容器通用的配置,又顺利达到目的。

然后通过如下Logstash配置收集即可:

input {

    file {

        path => ["/var/lib/docker/containers/*/*-json.log"]

        codec => json

    }

}

filter {

    grok {

        match => ["path"。"/(?<container_id>\w+)-json.log" ]

        remove_field => ["path"]

    }

    date {

        match => ["time", "ISO8601"]

    }

}

3.8.2 通过logspout收集

logspout是Docker生态圈中最有名的日志收集方式,其设计思路是:每个主机上启动一个单独容器运行logspout服务,负责将同一个主机上其他容器的日志,根据route设定,转发给不同的接收端。

logspout的基本用法如下:

$ docker pull gliderlabs/logspout:latest

$ docker run --name="logspout" \

    --volume=/var/run/docker.sock:/tmp/docker.sock \

gliderlabs/logspout \

--publish=127.0.0.1:8000:80

    syslog://remoteaddr:514

此外,logspout提供动态变更route的方式,如下所示:

# curl $(docker port `docker ps -lq` 8000)/routes \

    -X POST \

    -d '{"source": {"filter_name": "*_db", "types": ["stderr"]}, "target":

        {"type": "syslog", "addr": "remoteaddr2:5140"}}'

这个配置的意思是,将容器名带有db字样的走错误输出的采集的日志,以syslog协议发送到remoteaddr2主机的5140端口。

注意,logspout采用的是RFC5424版本的syslog协议,所以如果使用的接收方是RFC3164版本的syslog协议解析,需要自己调整一下。比如logstash-input-syslog采用的就是RFC3164协议,所以需要自己来另外完成:

input {

    tcp {

        port => 5140

    }

}

filter {

    grok {

        match => [“message”, “<SYSLOG5424PRI:syslog_pri> %{SYSLOG5424LINE:message}“]

    }

}

此外,logspout支持模块化扩展,所以,我们也可以直接在logspout上处理成对Logstash更友好的格式。扩展logspout支持Logstash格式的方法如下:

1)编辑Dockerfile,修改成如下内容:

FROM gliderlabs/logspout:master

ENV ROUTE_URIS=logstash://host:port

2)编辑modules.go,修改成如下内容:

package main

import (

    _ "github.com/looplab/logspout-logstash"

    _ "github.com/gliderlabs/logspout/transports/udp"

)

3)构建镜像:

docker build

这样,后续Logstash就直接进行JSON解析即可。

相关文章
|
3月前
|
存储 Prometheus 监控
Prometheus vs. ELK Stack:容器监控与日志管理工具的较量
随着容器化技术的广泛应用,容器监控与日志管理成为了关键任务。本文将对两种常用工具进行比较与选择,分别是Prometheus和ELK Stack。Prometheus是一款开源的监控系统,专注于时序数据的收集和告警。而ELK Stack则是一套完整的日志管理解决方案,由Elasticsearch、Logstash和Kibana三个组件组成。通过比较它们的特点、优势和适用场景,读者可以更好地了解如何选择适合自己需求的工具。
|
3月前
|
Go 数据处理 Docker
elk stack部署自动化日志收集分析平台
elk stack部署自动化日志收集分析平台
76 0
|
3月前
|
Prometheus 监控 Cloud Native
Prometheus VS ELK Stack:容器监控与日志管理工具的比较与选择
在容器化时代,有效的容器监控与日志管理工具对于确保应用程序的可靠性和可维护性至关重要。本文将比较两个主流工具,Prometheus和ELK Stack,探讨它们在容器监控和日志管理方面的特点、优势和适用场景,帮助读者做出明智的选择。
|
10月前
|
存储 监控 安全
【Elastic Stack-初识篇】 ELK介绍、搭建最新 ELK 日志分析系统
【Elastic Stack-初识篇】 ELK介绍、搭建最新 ELK 日志分析系统
647 0
|
11月前
|
消息中间件 缓存 负载均衡
【日志架构】ELK Stack + Kafka 端到端练习
【日志架构】ELK Stack + Kafka 端到端练习
|
11月前
|
消息中间件 监控 固态存储
带你读《Elastic Stack 实战手册》之71:——4.1.3.企业ELK日志搜索引擎
带你读《Elastic Stack 实战手册》之71:——4.1.3.企业ELK日志搜索引擎
BXA
|
11月前
|
Prometheus Kubernetes 监控
搭建高效微服务架构:Kubernetes、Prometheus和ELK Stack的完美组合
微服务架构是一种软件设计模式,它将单个应用程序拆分成一组更小、更独立的服务。每个服务在自己的进程中运行,并使用轻量级通信机制进行通信。由于每个服务都是独立的,因此可以独立部署、扩展和更新,从而使开发和运维更加容易。
BXA
292 0
|
消息中间件 数据采集 存储
客户端同学应该理解的 ELK Stack 组件知识
客户端同学应该理解的 ELK Stack 组件知识
126 0
客户端同学应该理解的 ELK Stack 组件知识
|
存储 监控 安全
【Elastic Stack】 搭建最新 ELK 日志分析系统 8.2.2版
大家好,我是无名小歌。 今天给大家分享一个centos7系统搭建2022年最新ELK日志分析系统,目前版本是8.2.2。值得注意的是安装 ELK 时,您必须在整个ELK中使用相同的版本,如:Elasticsearch 8.2.2,则安装Kibana 8.2.2 和 Logstash 8.2.2,如果出现不对应的情况,如:Elasticsearch 是8.2.2版本、Kibana-6.8等或是其他版本,则需要进行对应版本的升级到8.2.2版本。
1254 0
【Elastic Stack】 搭建最新 ELK 日志分析系统 8.2.2版
|
Kubernetes 数据可视化 API
Kubernetes中部署ELK Stack日志收集平台(下)
Kubernetes中部署ELK Stack日志收集平台
Kubernetes中部署ELK Stack日志收集平台(下)