Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中
Logstash 是一个开源数据收集引擎,具有实时流水线功能。Logstash 可以动态统一来自不同来源的数据,并将数据规范化为您选择的目的地。为各种高级下游分析和可视化用例清理和民主化您的所有数据。
安装 Logstash
从下载的二进制文件安装
Logstash 二进制文件可从 https://www.elastic.co/cn/downloads/logstash。
下载适用于您的主机环境的 Logstash 安装文件 - TARG.GZ、DEB、ZIP 或 RPM。
启动logstash
从控制台进行测试
输入 stdin 输出 stdout
cd logstash-7.13.4
./bin/logstash -e 'input { stdin { } } output { stdout {} }'
此时控制台作为输入流输入任何内容回车,输出流会输出到控制台。
指定配置文件
通过 -f 指定配置文件
通过 --config.reload.automatic 自动重新加载配置文件
bin/logstash -f first-pipeline.conf --config.reload.automatic
配置文件主要有3部分 input filter output
Codec plugins
codec插件可以在input、output流中处理数据,更改数据格式。
常用的codec有
- json 读取 JSON 格式的内容,为 JSON 数组中的每个元素创建一个事件
- json_lines 读取以换行符分隔的 JSON
- plain 读取纯文本,事件之间没有分隔
- mutiline 将多行消息合并为一个事件
将kafka日志消息输入到logstash
topics指定监听的topic
json将消息转为json格式
input {
kafka {
id => "my_plugin_id"
bootstrap_servers => "localhost:9092"
topics => ["logger-channel"]
auto_offset_reset => "latest"
}
}
filter {
#json
json {
source => "message"
}
date {
match => ["time", "yyyy-MM-dd HH:mm:ss.SSS"]
remove_field => ["time"]
}
}
output {
stdout {}
}
启动服务
./bin/logstash -f ./config/kafka-std.conf --config.reload.automatic
控制台接收到kafka的日志消息
{
"logger" => "com.paw.kafka.elk.controller.KafkaLogController",
"@version" => 1,
"thread" => "http-nio-8080-exec-7",
"levelVal" => 20000,
"@timestamp" => 2021-08-01T07:10:27.273Z,
"appName" => "paw-kelk",
"message" => "cost time: 23",
"env" => "dev",
"caller" => {
"file" => "KafkaLogController.java",
"method" => "kafka",
"class" => "com.paw.kafka.elk.controller.KafkaLogController",
"line" => 35
},
"level" => "INFO"
}
{
"logger" => "com.paw.kafka.elk.controller.KafkaLogController",
"@version" => 1,
"thread" => "http-nio-8080-exec-7",
"levelVal" => 10000,
"@timestamp" => 2021-08-01T07:10:27.273Z,
"appName" => "paw-kelk",
"message" => "debug time: 23",
"env" => "dev",
"caller" => {
"file" => "KafkaLogController.java",
"method" => "kafka",
"class" => "com.paw.kafka.elk.controller.KafkaLogController",
"line" => 36
},
"level" => "DEBUG"
}
至此kafka日志写入logstash完成。logsstash作为kafka日志topic的一个消费端,kafka将日志发往logstash,logtash以输入流方式结束日志数据,经过filter加工处理输出到输出流如elasticsearch中。