《ELK Stack权威指南(第2版)》一2.1.1 标准输入

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:

本节书摘来自华章出版社《ELK Stack权威指南(第2版)》一书中的第2章,第2.1节,作者 饶琛琳  更多章节内容可以访问云栖社区“华章计算机”公众号查看。 


第2章

插 件 配 置

插件是Logstash最大的特色。各种不同的插件源源不断地被创造出来,发布到社区中供大家使用。本章会按照插件的类别,对一般场景下的一些常用插件做详细的配置和用例介绍。本章介绍的插件包括:1)输入插件。基于shipper端场景,主要介绍STDIN、TCP、File等插件。2)编解码插件。编解码通常是会被遗忘的环节,但是运用好了,会大大提高工作效率,本节介绍最常用的JSON和multiline插件。3)过滤器插件。名为过滤器,其实各种数据裁剪和计算都可以在这类插件里完成,是Logstash最强大的一环。本节会详细介绍grok、date、mutate、ruby、metrics等插件的妙用。4)输出插件。Logstash虽然经常跟Elasticsearch并称,但是作为一个日志传输框架,它其实可以输出数据到各种不同的地方。比如Graphite、HDFS、Nagios等等。本章会介绍这些常用的输出插件用法。

2.1 输入插件

在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。

限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。

以上请读者自明。

2.1.1 标准输入

我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。

配置示例如下:

input {

    stdin {

        add_field => {"key" =>"value"}

        codec =>"plain"

        tags => ["add"]

        type =>"std"

    }

}

用上面的新stdin设置重新运行一次最开始的Hello World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:

{

"message" =>"hello world",

"@version" =>"1",

"@timestamp" =>"2014-08-08T06:48:47.789Z",

"type" =>"std",

"tags" => [

        [0] "add"

    ],

"key" =>"value",

"host" =>"raochenlindeMacBook-Air.local"

}

type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型—我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。

最常见的用法是像下面这样:

input {

    stdin {

        type =>"web"

    }

}

filter {

    if [type] == "web" {

        grok {

            match => ["message", %{COMBINEDAPACHELOG}]

        }

    }

}

output {

    if "_grokparsefailure" in [tags] {

        nagios_nsca {

            nagios_status =>"1"

        }

    } else {

        elasticsearch {

        }

    }

}

看起来蛮复杂的,对吧?

继续学习,你也可以写出来的。

2.1.2 文件输入

分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。

Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。

sincedb文件中记录了每个被监听的文件的inode, major number, minor number和pos。

配置示例如下:

input {

    file {

        path => ["/var/log/*.log", "/var/log/message"]

        type =>"system"

        start_position =>"beginning"

    }

}

有一些比较有用的配置项,可以用来指定FileWatch库的行为:

discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。

exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。

sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。

sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。

stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。

start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于less+F命令的形式运行。

close_older:一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3 600秒,即一小时。

ignore_older:在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86 400秒,即一天。

注意事项如下:

1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。

2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。

3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path =>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。

4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。此外,官方博客上提供了另一个巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定义为/dev/null,则每次重启自动从头开始读。

5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参见后面5.5节。

2.1.3 TCP输入

未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。

虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。

配置示例如下:

input {

    tcp {

        port => 8888

        mode =>"server"

        ssl_enable => false

    }

}

目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:

# nc 127.0.0.1 8888 < olddata

这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

2.1.4 syslog输入

syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。

我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。

有关rsyslog的用法,后面的5.4节会有更详细的介绍。

配置示例如下:

input {

    syslog {

        port =>"514"

    }

}

作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:

{

"message" =>"Hello World",

"@version" =>"1",

"@timestamp" =>"2014-08-08T09:01:15.911Z",

"host" =>"127.0.0.1",

"priority" =>31,

"timestamp" =>"Aug  8 17:01:15",

"logsource" =>"raochenlindeMacBook-Air.local",

"program" =>"com.apple.metadata.mdflagwriter",

"pid" =>"381",

"severity" =>7,

"facility" =>3,

"facility_label" =>"system",

"severity_label" =>"Debug"

}

Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:

input {

    tcp {

        port =>"8514"

    }

}

filter {

    grok {

        match => ["message", "%{SYSLOGLINE}" ]

    }

    syslog_pri { }

}

建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。

因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:

# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'

Recv-Q

228096

228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!

强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!

虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降—经过测试,TCPServer每秒可以接收50 000条数据,而在同一线程中启用grok后每秒只能处理5 000条,再加上date只能达到500条!

才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到每秒30 000条数据!

测试采用Logstash作者提供的命令:

yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000

出处见:https:// github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。

2.1.5 http_poller抓取

Logstash作为数据采集系统,也支持自己作为一个HTTP客户端去抓取网页数据或者接口数据。这方面有一个很明显的IT运维应用场景:很多业务系统软件本身提供了RESTful的内部运行状态接口,可以直接通过接口采集这些监控信息。

更长期的方案应该是编写对应的metricbeat模块,但是直接采用logstash-input-http_poller显然更快捷。

比如Nginx的性能状态,社区有一个非常全面的性能状态监控模块:nginx-module-vts。在新浪微博,后端池分为核心接口、非核心接口两块,我们要分别监控的话,nginx-module-vts的配置如下:

http {

    vhost_traffic_status_zone;

    map $uri $filter_uri {

        default 'non-core';

        /2/api/timeline core;

        ~^/2/api/unread core;

    }

    server {

        vhost_traffic_status_filter_by_set_key $filter_uri;

        location /status {

            auth_basic "Restricted";

            auth_basic_user_file pass_file;

            vhost_traffic_status_display;

            vhost_traffic_status_display_format json;

        }

    }

}

则对应的logstash-input-http_poller配置如下:

input {

    http_poller {

        urls => {

            0 => {

                method => get

                url => "http://localhost:80/status/format/json"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

            1 => {

                method => get

                url => "http://localhost:80/status/con ... up%3D*"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

        }

        request_timeout => 60

        interval => 60

        codec => "json"

    }

}

这段配置就可以就可以每60秒获得一次 vts 数据,并重置计数了。

注意,url是一个Hash值,所以它的执行顺序是根据Hash.map来的,为了确保我们是先获取数据再重置,这里干脆用0, 1来作为Hash的key,这样顺序就没问题了。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
4月前
|
存储 Prometheus 监控
Prometheus vs. ELK Stack:容器监控与日志管理工具的较量
随着容器化技术的广泛应用,容器监控与日志管理成为了关键任务。本文将对两种常用工具进行比较与选择,分别是Prometheus和ELK Stack。Prometheus是一款开源的监控系统,专注于时序数据的收集和告警。而ELK Stack则是一套完整的日志管理解决方案,由Elasticsearch、Logstash和Kibana三个组件组成。通过比较它们的特点、优势和适用场景,读者可以更好地了解如何选择适合自己需求的工具。
|
4月前
|
Go 数据处理 Docker
elk stack部署自动化日志收集分析平台
elk stack部署自动化日志收集分析平台
87 0
|
4月前
|
Prometheus 监控 Cloud Native
Prometheus VS ELK Stack:容器监控与日志管理工具的比较与选择
在容器化时代,有效的容器监控与日志管理工具对于确保应用程序的可靠性和可维护性至关重要。本文将比较两个主流工具,Prometheus和ELK Stack,探讨它们在容器监控和日志管理方面的特点、优势和适用场景,帮助读者做出明智的选择。
|
11月前
|
存储 监控 安全
【Elastic Stack-初识篇】 ELK介绍、搭建最新 ELK 日志分析系统
【Elastic Stack-初识篇】 ELK介绍、搭建最新 ELK 日志分析系统
725 0
|
12月前
|
消息中间件 缓存 负载均衡
【日志架构】ELK Stack + Kafka 端到端练习
【日志架构】ELK Stack + Kafka 端到端练习
|
12月前
|
消息中间件 监控 固态存储
带你读《Elastic Stack 实战手册》之71:——4.1.3.企业ELK日志搜索引擎
带你读《Elastic Stack 实战手册》之71:——4.1.3.企业ELK日志搜索引擎
BXA
|
12月前
|
Prometheus Kubernetes 监控
搭建高效微服务架构:Kubernetes、Prometheus和ELK Stack的完美组合
微服务架构是一种软件设计模式,它将单个应用程序拆分成一组更小、更独立的服务。每个服务在自己的进程中运行,并使用轻量级通信机制进行通信。由于每个服务都是独立的,因此可以独立部署、扩展和更新,从而使开发和运维更加容易。
BXA
337 0
|
消息中间件 数据采集 存储
客户端同学应该理解的 ELK Stack 组件知识
客户端同学应该理解的 ELK Stack 组件知识
132 0
客户端同学应该理解的 ELK Stack 组件知识
|
存储 监控 安全
【Elastic Stack】 搭建最新 ELK 日志分析系统 8.2.2版
大家好,我是无名小歌。 今天给大家分享一个centos7系统搭建2022年最新ELK日志分析系统,目前版本是8.2.2。值得注意的是安装 ELK 时,您必须在整个ELK中使用相同的版本,如:Elasticsearch 8.2.2,则安装Kibana 8.2.2 和 Logstash 8.2.2,如果出现不对应的情况,如:Elasticsearch 是8.2.2版本、Kibana-6.8等或是其他版本,则需要进行对应版本的升级到8.2.2版本。
1264 0
【Elastic Stack】 搭建最新 ELK 日志分析系统 8.2.2版