《ELK Stack权威指南(第2版)》一2.1.1 标准输入-阿里云开发者社区

开发者社区> 华章出版社> 正文
登录阅读全文

《ELK Stack权威指南(第2版)》一2.1.1 标准输入

简介:

本节书摘来自华章出版社《ELK Stack权威指南(第2版)》一书中的第2章,第2.1节,作者 饶琛琳  更多章节内容可以访问云栖社区“华章计算机”公众号查看。 


第2章

插 件 配 置

插件是Logstash最大的特色。各种不同的插件源源不断地被创造出来,发布到社区中供大家使用。本章会按照插件的类别,对一般场景下的一些常用插件做详细的配置和用例介绍。本章介绍的插件包括:1)输入插件。基于shipper端场景,主要介绍STDIN、TCP、File等插件。2)编解码插件。编解码通常是会被遗忘的环节,但是运用好了,会大大提高工作效率,本节介绍最常用的JSON和multiline插件。3)过滤器插件。名为过滤器,其实各种数据裁剪和计算都可以在这类插件里完成,是Logstash最强大的一环。本节会详细介绍grok、date、mutate、ruby、metrics等插件的妙用。4)输出插件。Logstash虽然经常跟Elasticsearch并称,但是作为一个日志传输框架,它其实可以输出数据到各种不同的地方。比如Graphite、HDFS、Nagios等等。本章会介绍这些常用的输出插件用法。

2.1 输入插件

在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。

限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。

以上请读者自明。

2.1.1 标准输入

我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。

配置示例如下:

input {

    stdin {

        add_field => {"key" =>"value"}

        codec =>"plain"

        tags => ["add"]

        type =>"std"

    }

}

用上面的新stdin设置重新运行一次最开始的Hello World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:

{

"message" =>"hello world",

"@version" =>"1",

"@timestamp" =>"2014-08-08T06:48:47.789Z",

"type" =>"std",

"tags" => [

        [0] "add"

    ],

"key" =>"value",

"host" =>"raochenlindeMacBook-Air.local"

}

type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型—我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。

最常见的用法是像下面这样:

input {

    stdin {

        type =>"web"

    }

}

filter {

    if [type] == "web" {

        grok {

            match => ["message", %{COMBINEDAPACHELOG}]

        }

    }

}

output {

    if "_grokparsefailure" in [tags] {

        nagios_nsca {

            nagios_status =>"1"

        }

    } else {

        elasticsearch {

        }

    }

}

看起来蛮复杂的,对吧?

继续学习,你也可以写出来的。

2.1.2 文件输入

分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。

Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。

sincedb文件中记录了每个被监听的文件的inode, major number, minor number和pos。

配置示例如下:

input {

    file {

        path => ["/var/log/*.log", "/var/log/message"]

        type =>"system"

        start_position =>"beginning"

    }

}

有一些比较有用的配置项,可以用来指定FileWatch库的行为:

discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。

exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。

sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。

sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。

stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。

start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于less+F命令的形式运行。

close_older:一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3 600秒,即一小时。

ignore_older:在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86 400秒,即一天。

注意事项如下:

1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。

2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。

3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path =>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。

4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。此外,官方博客上提供了另一个巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定义为/dev/null,则每次重启自动从头开始读。

5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参见后面5.5节。

2.1.3 TCP输入

未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。

虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。

配置示例如下:

input {

    tcp {

        port => 8888

        mode =>"server"

        ssl_enable => false

    }

}

目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:

# nc 127.0.0.1 8888 < olddata

这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

2.1.4 syslog输入

syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。

我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。

有关rsyslog的用法,后面的5.4节会有更详细的介绍。

配置示例如下:

input {

    syslog {

        port =>"514"

    }

}

作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:

{

"message" =>"Hello World",

"@version" =>"1",

"@timestamp" =>"2014-08-08T09:01:15.911Z",

"host" =>"127.0.0.1",

"priority" =>31,

"timestamp" =>"Aug  8 17:01:15",

"logsource" =>"raochenlindeMacBook-Air.local",

"program" =>"com.apple.metadata.mdflagwriter",

"pid" =>"381",

"severity" =>7,

"facility" =>3,

"facility_label" =>"system",

"severity_label" =>"Debug"

}

Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:

input {

    tcp {

        port =>"8514"

    }

}

filter {

    grok {

        match => ["message", "%{SYSLOGLINE}" ]

    }

    syslog_pri { }

}

建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。

因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:

# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'

Recv-Q

228096

228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!

强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!

虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降—经过测试,TCPServer每秒可以接收50 000条数据,而在同一线程中启用grok后每秒只能处理5 000条,再加上date只能达到500条!

才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到每秒30 000条数据!

测试采用Logstash作者提供的命令:

yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000

出处见:https:// github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。

2.1.5 http_poller抓取

Logstash作为数据采集系统,也支持自己作为一个HTTP客户端去抓取网页数据或者接口数据。这方面有一个很明显的IT运维应用场景:很多业务系统软件本身提供了RESTful的内部运行状态接口,可以直接通过接口采集这些监控信息。

更长期的方案应该是编写对应的metricbeat模块,但是直接采用logstash-input-http_poller显然更快捷。

比如Nginx的性能状态,社区有一个非常全面的性能状态监控模块:nginx-module-vts。在新浪微博,后端池分为核心接口、非核心接口两块,我们要分别监控的话,nginx-module-vts的配置如下:

http {

    vhost_traffic_status_zone;

    map $uri $filter_uri {

        default 'non-core';

        /2/api/timeline core;

        ~^/2/api/unread core;

    }

    server {

        vhost_traffic_status_filter_by_set_key $filter_uri;

        location /status {

            auth_basic "Restricted";

            auth_basic_user_file pass_file;

            vhost_traffic_status_display;

            vhost_traffic_status_display_format json;

        }

    }

}

则对应的logstash-input-http_poller配置如下:

input {

    http_poller {

        urls => {

            0 => {

                method => get

                url => "http://localhost:80/status/format/json"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

            1 => {

                method => get

                url => "http://localhost:80/status/con ... up%3D*"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

        }

        request_timeout => 60

        interval => 60

        codec => "json"

    }

}

这段配置就可以就可以每60秒获得一次 vts 数据,并重置计数了。

注意,url是一个Hash值,所以它的执行顺序是根据Hash.map来的,为了确保我们是先获取数据再重置,这里干脆用0, 1来作为Hash的key,这样顺序就没问题了。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享: