logstash是基于实时管道的数据收集引擎。它像一根处理数据的管道,收集分散的数据,进行汇总处理后输出给下游进行数据分析和展现。
logstash可以配合Beats组件或者其它第三方组件进行数据收集,数据经过处理后存放到elasticsearch中进行检索和分析。
除了对接beats以外,logstash有着丰富的组件,能够支持各种数据源接入。包括jdbc、kafka、http等。
logstash是基于pipeline方式进行数据处理的,pipeline可以理解为数据处理流程的抽象。在一条pipeline数据经过上游数据源汇总到消息队列中,然后由多个工作线程进行数据的转换处理,最后输出到下游组件。一个logstash中可以包含多个pipeline。
基本概念
- pipeline:一条数据处理流程的逻辑抽象,类似于一条管道,数据从一端流入,经过处理后,从另一端流出;一个pipeline包括输入、过滤、输出3个部分,其中输入和输出部分是必选组件,过滤是可选组件;
- instance:一个logstash实例,可以包含多条数据处理流程,即多个pipeline;
- inputs:数据输入组件,用于对接各种数据源,接入数据,支持解码器,允许对数据进行编码解码操作;必选组件;
- filters:数据过滤组件,负责对输入数据进行加工处理;可选组件;
- outputs:数据输出组件,用于对接下游组件,发送处理后的数据,支持解码器,允许对数据进行编码解码操作;必选组件;
- event:pipeline中的数据都是基于事件的,一个event可以看作是数据流中的一条数据或者一条消息;
安装logstash
1)依赖java8,且不支持java9:检查java版本并配置JAVA_HOME环境变量。logstash基于jruby开发,logstash 6.x版本要求运行在java8环境,且目前不支持java9;
2)下载并解压:下载logstash,解压文件;注意logstash所在路径中不可以包含冒号;
tar -zxvf logstash-6.2.2.tar.gz
cd logstash-6.2.2
3)启动logstash,发布第一个事件:通过-e指定一个pipeline的处理流程,指定从stdin中读取event,然后在stdout输出;
bin/logstash -e 'input { stdin { } } output { stdout {} }'
可以看到类似如下日志:
Sending Logstash's logs to /home/work/zion_package/elastic/logstash/logstash-6.2.2/logs which is now configured via log4j2.properties
h[2018-03-14T14:48:00,053][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/modules/fb_apache/configuration"}
[2018-03-14T14:48:00,074][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/modules/netflow/configuration"}
[2018-03-14T14:48:00,172][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/data/queue"}
[2018-03-14T14:48:00,178][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/data/dead_letter_queue"}
[2018-03-14T14:48:00,631][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-03-14T14:48:00,672][INFO ][logstash.agent ] No persistent UUID file found. Generating new UUID {:uuid=>"5901ab9f-fdc9-43dc-a88d-c5c636cf8224", :path=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/data/uuid"}
[2018-03-14T14:48:01,337][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.2"}
[2018-03-14T14:48:01,733][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-14T14:48:03,264][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-14T14:48:03,453][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x128244e5 run>"}
The stdin plugin is now waiting for input:
[2018-03-14T14:48:03,546][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
在控制台随便输入字符串按回车后,发现消息会立刻在控制台输出,此时pipeline的处理过程是从stdin接收我们的输入,然后再在stdout输出:
Hello, this is my first event to logstash !
2018-03-14T06:50:26.261Z yf-beidou-dmp00.yf01.baidu.com Hello, this is my first event to logstash !
logstash会给消息添加ip和时间戳,要退出logstash,输入ctrl+d;
通过Filebeat发送日志到logstash
配置Filebeat
Filebeat用于追踪服务器上的文件数据,它的设计以可靠和低资源占用为初衷,和业务系统部署在一起时,能够避免因为资源消耗影响到业务系统的正常运行。logstash安装后默认包含Beats input组件,用于接收各种beat组件上报事件。Filebeat也可以直接上报事件给elasticsearch,而不用经过logstash。
1)下载文件样例logstash-tutorial.log.gz,上传至Filebeat将要部署的服务器并解压;该样例为官方文档提供的apache的web日志样例。
2)下载Filebeat,上传服务器并解压;此处下载LINUX 64-BIT的安装包;
tar -zxvf filebeat-6.2.2-linux-x86_64.tar.gz
cd filebeat-6.2.2-linux-x86_64
3)配置Filebeat,修改filebeat.yml,设置监控日志文件路径和上报logstash地址;
filebeat.prospectors:
- type: log
# 监控日志文件路径
paths:
- /path/to/file/logstash-tutorial.log
output.logstash:
# 上报logstash地址
hosts: ["localhost:5044"]
注:设置监控日志文件的路径一定是绝对路径,支持通配符;
4)启动Filebeat;其中-e参数指定输出日志到stderr,而非输出到日志文件;
-c参数指定配置文件路径;-d参数debug选择器;
./filebeat -e -c filebeat.yml -d "publish"
配置logstash
1)创建pipeline配置文件;pipeline配置格式如下:
# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}
创建一个名为first-pipeline.conf的文件,配置如下:
input {
beats {
# 设置beats上报端口
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
# 输出到stdout,同时指定日志解码器为rubydebug
stdout { codec => rubydebug }
}
2)校验配置是否正确,命令如下。--config.test_and_exit选项会校验配置文件,并输出错误;
bin/logstash -f first-pipeline.conf --config.test_and_exit
输出类似如下日志,说明配置文件格式校验通过:
Configuration OK
[2018-03-14T16:23:08,919][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
3)启动logstash,命令如下。--config.reload.automatic选项能够使得配置文件修改后被自动加载,从而避免重新启动logstash;
bin/logstash -f first-pipeline.conf --config.reload.automatic
如果配置正确,可以看到logstash命令行有类似如下输出:
{
"host" => "yf-beidou-dmp00.yf01.baidu.com",
"offset" => 22310,
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"message" => "218.30.103.62 - - [04/Jan/2015:05:28:43 +0000] \"GET /blog/geekery/xvfb-firefox.html HTTP/1.1\" 200 10975 \"-\" \"Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)\"",
"prospector" => {
"type" => "log"
},
"beat" => {
"name" => "yf-beidou-dmp00.yf01.baidu.com",
"version" => "6.2.2",
"hostname" => "yf-beidou-dmp00.yf01.baidu.com"
},
"@timestamp" => 2018-03-14T08:43:52.564Z,
"source" => "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
"@version" => "1"
}
通过Grok过滤组件解析日志
通过上面的例子,我们可以将filebeat上报的日志经由logstash输出,接下来将添加filter组件,对日志进行处理。grok组件是logstash的filter组件之一,它可以将非结构化的数据按照一定规则整理成结构化数据,从而便于检索。这些规则需要根据日志格式事先设定好,因此需要了解采集日志的格式。因为我们的样例日志是apache的web日志,因而可以直接使用grok提供的%{COMBINEDAPACHELOG}格式进行日志的过滤,过滤后的日志格式如下:
修改first-pipeline.conf文件,增加filter,如下:
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug }
}
因为启动logstash时添加了--config.reload.automatic选项,logstash能够自动加载修改后的配置文件,因而不需要重启;修改保存后可以看到重新加载配置,重启pipeline的日志:
[2018-03-14T17:04:57,325][INFO ][logstash.pipelineaction.reload] Reloading pipeline {"pipeline.id"=>:main}
[2018-03-14T17:05:01,758][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x58f295b9 run>"}
[2018-03-14T17:05:02,048][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-14T17:05:02,362][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2018-03-14T17:05:02,422][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x5506295b sleep>"}
[2018-03-14T17:05:02,429][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2018-03-14T17:05:02,453][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
为了能够使filebeat重新读取文件,需要停止filebeat,删除读取文件的保存点记录,并重启filebeat:
cd filebeat-6.2.2-linux-x86_64
rm data/registry
添加filter后,输出的日志格式如下,发现不仅输出了日志原文,同时对日志进行解析切割,存放到相应的字段中。
{
"host" => "yf-beidou-dmp00.yf01.baidu.com",
"clientip" => "121.107.188.202",
"verb" => "GET",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"message" => "121.107.188.202 - - [04/Jan/2015:05:27:57 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1\" 200 171717 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36\"",
"beat" => {
"name" => "yf-beidou-dmp00.yf01.baidu.com",
"version" => "6.2.2",
"hostname" => "yf-beidou-dmp00.yf01.baidu.com"
},
"httpversion" => "1.1",
"auth" => "-",
"response" => "200",
"bytes" => "171717",
"ident" => "-",
"@version" => "1",
"request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png",
"offset" => 21927,
"prospector" => {
"type" => "log"
},
"@timestamp" => 2018-03-14T09:06:47.927Z,
"source" => "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
"timestamp" => "04/Jan/2015:05:27:57 +0000",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36\"",
"referrer" => "\"-\""
}
使用Geoip过滤器组件增强数据处理
geoip也是filter组件的一种,用于从ip地址中解析出位置信息,并添加到输出日志中;geoip需要配置需要指定存放ip地址的字段,在本例中,我们使用通过gork解析后clientip字段;因为过滤器是按顺序过滤,所以需要确保geoip的过滤器在之前配置的gork过滤器之后,配置文件如下:
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
# 指定要进行ip解析的字段
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}
停止filebeat,删除data/registry文件,重启filebeats后再次查看日志,发现新增了地理位置信息:
{
"host" => "yf-beidou-dmp00.yf01.baidu.com",
"clientip" => "218.30.103.62",
"verb" => "GET",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"message" => "218.30.103.62 - - [04/Jan/2015:05:28:43 +0000] \"GET /blog/geekery/xvfb-firefox.html HTTP/1.1\" 200 10975 \"-\" \"Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)\"",
"beat" => {
"name" => "yf-beidou-dmp00.yf01.baidu.com",
"version" => "6.2.2",
"hostname" => "yf-beidou-dmp00.yf01.baidu.com"
},
"httpversion" => "1.1",
"auth" => "-",
"response" => "200",
"bytes" => "10975",
"ident" => "-",
"@version" => "1",
"request" => "/blog/geekery/xvfb-firefox.html",
"geoip" => {
"location" => {
"lat" => 39.9289,
"lon" => 116.3883
},
"latitude" => 39.9289,
"continent_code" => "AS",
"region_code" => "11",
"country_code3" => "CN",
"country_code2" => "CN",
"longitude" => 116.3883,
"city_name" => "Beijing",
"country_name" => "China",
"ip" => "218.30.103.62",
"region_name" => "Beijing",
"timezone" => "Asia/Shanghai"
},
"offset" => 22310,
"prospector" => {
"type" => "log"
},
"@timestamp" => 2018-03-14T09:22:31.299Z,
"source" => "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
"timestamp" => "04/Jan/2015:05:28:43 +0000",
"agent" => "\"Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)\"",
"referrer" => "\"-\""
}
将logstash数据输出到elasticsearch
1)修改output,写入数据到elasticsearch:修改first-pipeline.conf文件,配置elasticsearch访问地址;
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
# 指定elasticsearch地址,指定多个地址,logstash会自动负载均衡
hosts => [ "ip1:port1", "ip2,port2" ]
# 如果设置用户名和密码,则需要指定如下两个字段;
# 用户必须具有对index的CRUD权限;
user => "username"
password => "password"
}
}
2)重新发送消息:停止filebeat,删除data/registry文件,重启filebeats;
3)检索日志:执行如下语句,查询是否有日志写入elasticsearch,ip和端口是elasticsearch实例的ip和端口;因为first-pipeline.conf中未指定创建index的名称格式,默认为:logstash-yyyy.MM.dd(日期部分需要替换);如果未指定用户名/密码,则不需要-u参数;
curl -XGET 'ip:port/logstash-2018.03.14/_search?pretty&q=response=200' -u username:password
检索结果类似如下json串:
{
"took" : 20,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 98,
"max_score" : 2.3988724,
"hits" : [
{
"_index" : "logstash-2018.03.14",
"_type" : "doc",
"_id" : "t76VJGIBe9U4s2F_OL_W",
"_score" : 2.3988724,
"_source" : {
"verb" : "GET",
"@timestamp" : "2018-03-14T12:56:19.779Z",
"response" : "200",
"agent" : "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"source" : "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
"host" : "yf-beidou-dmp00.yf01.baidu.com",
"auth" : "-",
"clientip" : "83.149.9.216",
"geoip" : {
"country_code3" : "RU",
"continent_code" : "EU",
"ip" : "83.149.9.216",
... ...
4)在kibana上配置index-pattern,通过Discover检索日志:
点击菜单:Management -> Index Pattern;
点击 Create Index Pattern,创建index pattern;
点击菜单:Discover,检索日志;
参考
官方视频:Getting Started with Logstash
官方文档:Getting Started with Logstash