Logstash 入门
Logstash 是什么
Logstash 就是一个开源的数据流工具,它会做三件事:
1.从数据源拉取数据2.对数据进行过滤、转换等处理3.将处理后的数据写入目标地
例如:
•监听某个目录下的日志文件,读取文件内容,处理数据,写入 influxdb 。•从 kafka 中消费消息,处理数据,写入 elasticsearch 。
为什么要用 Logstash ?
方便省事。
假设你需要从 kafka 中消费数据,然后写入 elasticsearch ,如果自己编码,你得去对接 kafka 和 elasticsearch 的 API 吧,如果你用 Logstash ,这部分就不用自己去实现了,因为 Logstash 已经为你封装了对应的 plugin
插件,你只需要写一个配置文件形如:
input { kafka { # kafka consumer 配置 } } filter { # 数据处理配置 } output { elasticsearch { # elasticsearch 输出配置 } }
然后运行 logstash 就可以了。
Logstash 提供了两百多个封装好的 plugin
插件,这些插件被分为三类:
•input plugin
: 从哪里拉取数据•filter plugin
: 数据如何处理•output plugin
: 数据写入何处
使用 logstash 你只要编写一个配置文件,在配置文件中挑选组合这些 plugin
插件,就可以轻松实现数据从输入源到输出源的实时流动。
安装 logstash
请参数:官方文档
第一个示例
假设你已经安装好了 logstash ,并且可执行文件的路径已经加入到了 PATH 环境变量中。
下面开始我们的第一个示例,编写 pipeline.conf
文件,内容为:
input { stdin { } } filter { } output { stdout { } }
这个配置文件的含义是:
•input
输入为 stdin
(标准输入)•filter
为空(也就是不进行数据的处理)•output
输出为 stdout
(标准输出)
执行命令:
logstash -f pipeline.conf
等待 logstash 启动完毕,输入 hello world 然后回车, 你就会看到以下输出内容:
{ "message" => "hello world", "@version" => "1", "@timestamp" => 2020-11-01T08:25:10.987Z, "host" => "local" }
我们输入的内容已经存在于 message
字段中了。
当你输入其他内容后也会看到类似的输出。
至此,我们的第一个示例已经完成,正如配置文件中所定义的,Logstash 从 stdin 标准输入读取数据,不对源数据做任何处理,然后输出到 stdout 标准输出。
特定名词和字段
•event
: 数据在 logstash 中被包装成 event
事件的形式从 input 到 filter 再到 output 流转。•@timestamp
: 特殊字段,标记 event 发生的时间。•@version
: 特殊字段,标记 event 的版本号。•message
: 源数据内容。•@metadata
: 元数据,key/value 的形式,是否有数据得看具体插件,例如 kafka 的 input 插件会在 @metadata
里记录 topic、consumer_group、partition、offset 等一些元数据。•tags
: 记录 tag 的字符串数组。
字段引用
在配置文件中,可以通过 [field]
的形式引用字段内容,如果在字符串中,则可以通过 %{[field]}
的方式进行引用。
示例:
input { kafka { # kafka 配置 } } filter { # 引用 log_level 字段的内容进行判断 if [log_level] == "debug" { } } output { elasticsearch { # %{+yyyy.MM.dd} 来源于 @timestamp index => "log-%{+yyyy.MM.dd}" document_type => "_doc" document_id => "%{[@metadata][kafka][key]}" hosts => ["127.0.0.1:9200"] } }
Plugin 插件一览
用好 Logstash 的第一步就是熟悉 plugin 插件,只有熟悉了这些插件你才能快速高效的建立数据管道。
Input plugin
Input
插件定义了数据源,即 logstash 从哪里拉取数据。
•beats
: 从 Elastic Beats 框架中接收数据。
示例:
input { beats { port => 5044 } }
dead_letter_queue
: 从 Logstash 自己的 dead letter queue 中拉取数据,目前 dead letter queue 只支持记录 output 为 elasticsearch 时写入 400 或 404 的数据。
示例:
input { dead_letter_queue { path => "/var/logstash/data/dead_letter_queue" start_timestamp => "2017-04-04T23:40:37" } }
elasticsearch
: 从 elasticsearch 中读取 search query 的结果。
示例
nput { elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } } }' } }