打开微信扫一扫,关注微信公众号【数据与算法联盟】
转载请注明出处: http://blog.csdn.net/gamer_gyt
博主微博: http://weibo.com/234654758
Github: https://github.com/thinkgamer
写在前边的话
Logstash在ELK这个技术栈中占据着重要的位置,所有的数据都要经过logstash的解析,才能格式化的存入ES中,那么对于Logstash的学习也是十分重要的,今天这篇文章我们将看一下logstash的基本知识,后续会有更多细节性的文章和使用案例文章,请持续关注博主和ELK Stack 从入门到放弃专栏。
环境准备
以下在ubuntu16.04上进行
1:Java
sudo apt install openjdk-8-jdk
2:logstash安装
直接在官网下载最新的安装包:https://www.elastic.co/downloads/logstash
deb包安装(ubuntu):sudo dpkg -i logstash-5.2.2.deb
rpm包(Centos):sudo rpm -ivh logstash-5.2.2.deb
tar.gz包后者zip包:解压到指定的目录即可
Hello World
我的环境是Ubuntu16.04 / java 1.8 ELK版本均为5.2.1
sudo /usr/share/logstash/bin/logstash -e ‘input{stdin{}}output{stdout{codec=>rubydebug}}’
启动之后随便输入字符串即可看到:
thinkgamer@thinkgamer:/usr/share/logstash$ sudo /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
[sudo] thinkgamer 的密码:
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs to console
17:34:42.881 [LogStash::Runner] INFO logstash.agent - No persistent UUID file found. Generating new UUID {:uuid=>"04b030e9-d4ba-4414-b2ad-42df3a714a81", :path=>"/usr/share/logstash/data/uuid"}
The stdin plugin is now waiting for input:
17:34:43.775 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
17:34:43.864 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
17:34:44.040 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
hello world
{
"@timestamp" => 2017-03-27T09:35:46.959Z,
"@version" => "1",
"host" => "thinkgamer",
"message" => "hello world"
}
Logstash语法简介
logstash设计了自己的DSL,包括区域,注释,数据类型,条件判断和字段引用
区域(section):
Logstash 用 {} 来定义区域。区域内可以包括插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。示例如下:
input {
stdin {}
syslog {}
}
数据类型:
logstasj支持少量的数据类型
bool :debug => true
string: host => “hostname”
number: port => 5044
array: match => [“datetime” , “unitime”]
hash: options => { key1 => “value1”, key2 => “value2” }
字段引用(field reference):
logstash字段引用语法。要在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号 [] 里就行了,这就叫字段引用。还需注意字段层次。如果引用的是一个顶级字段,可以省略[],直接指定字段名。要引用嵌套的字段,需要指定完整的路径,如[top-level field][nested field]。
下面有五个顶级字段(agent, ip, request, response, ua) 和三个嵌套字段 (status, bytes, os)。
{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
"ip": "192.168.24.44",
"request": "/index.html"
"response": {
"status": 200,
"bytes": 52353
},
"ua": {
"os": "<a href="http://www.ttlsa.com/windows/" title="windows"target="_blank">Windows</a> 7"
}
}
为了引用os字段,需指定[ua][os]。引用顶级字段如request,可以简单指定request即可。
字段引用格式也可以用于logstash调用sprintf格式。这种格式可以从其他字符串中引用字段值。如:
output {
statsd {
increment => "apache.%{[response][status]}"
}
}
也可以格式化时间,如
output {
file {
path => "/var/log/%{type}.%{+yyyy.MM.dd.HH}"
}
}
条件判断
表达式支持下面这些操作符:
● ==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于)
● =~(匹配正则), !~(不匹配正则)
● in(包含), not in(不包含)
● and(与), or(或), nand(非与), xor(非或)
● ()(复合表达式), !()(对复合表达式结果取反)
命令行参数
-e :
即我们看到的hello world的测试
-f/–config :
意即文件。真实运用中,我们会写很长的配置,甚至可能超过 shell 所能支持的 1024 个字符长度。所以我们必把配置固化到文件里,然后通过 bin/logstash -f agent.conf 这样的形式来运行。
此外,logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用 bin/logstash -f /etc/logstash.d/ 来运行。logstash 会自动读取 /etc/logstash.d/ 目录下所有 *.conf 的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。
注意:
logstash 列出目录下所有文件时,是字母排序的。而 logstash 配置段的 filter 和 output 都是顺序执行,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用
-t/–configtest :
测试,用来测试logstash读取的conf文件是否能够正确的解析
-l/–log :
默认输出日志到标准错误,在生产环境中你可以通过 >bin/logstash -l logs/logstash.log 命令来统一存储日志
-w/–pipeline-workers :
允许filter和output的pipeline线程数量,默认是CPU核数
-b/–pipeline-batch-size :
每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。默认是 125 条。越大性能越好,同样也会消耗越多的 JVM 内存。
-u/–pipeline-batch-delay :
每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms。
-P/–pluginpath :
可以写自己的插件,然后用 bin/logstash –pluginpath /path/to/own/plugins 加载它们
–verbose :
输出一定的调试日志
–debug:
输出更多的调试日志,在logstash 5.0 开始,提供了logstash.yaml配置文件,可以将命令行的所有参数都通过YAML文件方式设置
input/filter/output
在hello world中已经见识了默认的logstash运行流程和配置的基础语法,配置文件中主要包括三部分,input,filter,output,下边将会分别来看这三部分
输入插件input:
Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 logstash 会漏过你的数据。sincedb 文件中记录了每个被监听的文件的 inode, major number, minor number 和 pos。
读取文件(File)
input {
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
这里有一些配置:
有一些比较有用的配置项,可以用来指定 FileWatch 库的行为:
● discover_interval
logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
● exclude
不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
● close_older
一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时。
● ignore_older
在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天。
● sincedb_path
如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile.sincedb),可以通过这个配置定义 sincedb 文件到其他位置。
● sincedb_write_interval
logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
● stat_interval
logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
● start_position
logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 “beginning”,logstash 进程就从头开始读取,类似 less +F 的形式运行。
标准输入(stdin)
stdin应该是logstash最简单最基础的插件了
eg:
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
解释:
type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。而 tags 则是在数据处理过程中,由具体的插件来添加或者删除的。
最常见的用户应该是这样的
input {
stdin {
type => "web"
tags => ["add1","add2"]
}
}
filter {
if [type] == "web" {
}
}
output {
if "_grokparsefailure" in [tags] {
} else {
}
}
读取Syslog数据
syslog会帮助你很容易的手收集设备上的数据,而且配置也很简单,http://blog.csdn.net/gamer_gyt/article/details/54025857 这篇文章中对linux的syslog配置介绍的比较清晰,那么如何在logstash中配置呢?
eg:
input {
syslog {
port => "514"
}
}
Logstash 是用 UDPSocket, TCPServer 和 LogStash::Filters::Grok 来实现 LogStash::Inputs::Syslog的。所以你其实可以直接用 logstash 配置实现一样的效果
input {
tcp {
port => "8514"
}
udp {
port => "8515"
}
}
建议在使用 LogStash::Inputs::Syslog 的时候走 TCP 协议来传输数据,因为具体实现中,UDP 监听器只用了一个线程,而 TCP 监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。
注意:虽然 LogStash::Inputs::TCP 用 Ruby 的 Socket 和 OpenSSL 库实现了高级的 SSL 功能,但 Logstash 本身只能在 SizedQueue 中缓存 20 个事件。所以建议在生产环境中换用其他消息队列。
编码插件codec介绍
Logstash 不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的。codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等。
- json:用来解析输入数据是json格式的
- multiline:用来解析一个事件可能是多行的内容
过滤器插件 filter
filter主要使用来解析message的,在filter中所有的DSL是按顺序执行的,这里边的东西太多,主要包括grok 解析,数据修改等,这个在稍后的文章中会做详细解释
输出插件 output
1:输出到elasticsearch
eg:
output {
elasticsearch {
hosts => ["192.168.0.2:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
flush_size => 20000
idle_flush_time => 10
sniffing => true
template_overwrite => true
}
}
解释:
批量发送
在过去的版本中,主要由本插件的 flush_size 和 idle_flush_time 两个参数共同控制 Logstash 向 Elasticsearch 发送批量数据的行为。以上面示例来说:Logstash 会努力攒到 20000 条数据一次性发送出去,但是如果 10 秒钟内也没攒够 20000 条,Logstash 还是会以当前攒到的数据量发一次。默认情况下,flush_size 是 500 条,idle_flush_time 是 1 秒。这也是很多人改大了 flush_size 也没能提高写入 ES 性能的原因——Logstash 还是 1 秒钟发送一次。从 5.0 开始,这个行为有了另一个前提:flush_size 的大小不能超过 Logstash 运行时的命令行参数设置的 batch_size,否则将以 batch_size 为批量发送的大小。
索引名
写入的 ES 索引的名称,这里可以使用变量。为了更贴合日志场景,Logstash 提供了 %{+YYYY.MM.dd} 这种写法。在语法解析的时候,看到以 + 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。所以,之前处理过程中不要给自定义字段取个加号开头的名字……
此外,注意索引名中不能有大写字母,否则 ES 在日志中会报 InvalidIndexNameException,但是 Logstash 不会报错,这个错误比较隐晦,也容易掉进这个坑中。
轮询
Logstash 1.4.2 在 transport 和 http 协议的情况下是固定连接指定 host 发送数据。从 1.5.0 开始,host 可以设置数组,它会从节点列表中选取不同的节点发送数据,达到 Round-Robin 负载均衡的效果。
2:stdout 输出
eg:
output {
stdout {
codec => rubydebug
workers => 2
}
}
解释:
输出插件统一具有一个参数是 workers。Logstash 为输出做了多线程的准备。
其次是 codec 设置。codec 的作用在之前已经讲过。可能除了 codecs/multiline ,其他 codec 插件本身并没有太多的设置项。所以一般省略掉后面的配置区段。换句话说。上面配置示例的完全写法应该是:
output {
stdout {
codec => rubydebug {
}
workers => 2
}
}
单就 outputs/stdout 插件来说,其最重要和常见的用途就是调试。所以在不太有效的时候,加上命令行参数 -vv 运行,查看更多详细调试信息。
3:输出到hdfs
具体可参照这个 https://github.com/dstore-dbap/logstash-output-webhdfs-discontinued
4:输出到文件中
通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。早年的 scribed ,甚至直接就把输出的语法命名为 。Logstash 当然也能做到这点。
和 LogStash::Inputs::File 不同, LogStash::Outputs::File 里可以使用 sprintf format 格式来自动定义输出到带日期命名的路径。
eg:
output {
file {
path => "/path/to/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log.gz"
message_format => "%{message}"
gzip => true
}
}
解释:
使用 output/file 插件首先需要注意的就是 message_format 参数。插件默认是输出整个 event 的 JSON 形式数据的。这可能跟大多数情况下使用者的期望不符。大家可能只是希望按照日志的原始格式保存就好了。所以需要定义为 %{message},当然,前提是在之前的 filter 插件中,你没有使用 remove_field 或者 update 等参数删除或修改 %{message} 字段的内容。
另一个非常有用的参数是 gzip。gzip 格式是一个非常奇特而友好的格式。其格式包括有:
● 10字节的头,包含幻数、版本号以及时间戳
● 可选的扩展头,如原文件名
● 文件体,包括DEFLATE压缩的数据
● 8字节的尾注,包括CRC-32校验和以及未压缩的原始数据长度