kafka日志写入logstash

简介: kafka日志写入logstash

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中

Logstash 是一个开源数据收集引擎,具有实时流水线功能。Logstash 可以动态统一来自不同来源的数据,并将数据规范化为您选择的目的地。为各种高级下游分析和可视化用例清理和民主化您的所有数据。

basic_logstash_pipeline.png

安装 Logstash

从下载的二进制文件安装

Logstash 二进制文件可从 https://www.elastic.co/cn/downloads/logstash

下载适用于您的主机环境的 Logstash 安装文件 - TARG.GZ、DEB、ZIP 或 RPM。

启动logstash

从控制台进行测试
输入 stdin 输出 stdout

cd logstash-7.13.4 
./bin/logstash -e 'input { stdin { } } output { stdout {} }'

此时控制台作为输入流输入任何内容回车,输出流会输出到控制台。

指定配置文件

通过 -f 指定配置文件
通过 --config.reload.automatic 自动重新加载配置文件

bin/logstash -f first-pipeline.conf --config.reload.automatic

配置文件主要有3部分 input filter output

Codec plugins

codec插件可以在input、output流中处理数据,更改数据格式。

常用的codec有

  • json 读取 JSON 格式的内容,为 JSON 数组中的每个元素创建一个事件
  • json_lines 读取以换行符分隔的 JSON
  • plain 读取纯文本,事件之间没有分隔
  • mutiline 将多行消息合并为一个事件

将kafka日志消息输入到logstash

topics指定监听的topic
json将消息转为json格式

input {
  kafka {
    id => "my_plugin_id"
    bootstrap_servers => "localhost:9092"
    topics => ["logger-channel"]
    auto_offset_reset => "latest"
  }
}

filter {
  #json
    json {
        source => "message"
    }
    date {
        match => ["time", "yyyy-MM-dd HH:mm:ss.SSS"]
        remove_field => ["time"]
    }
}

output {
  stdout {}
}

启动服务

./bin/logstash -f ./config/kafka-std.conf --config.reload.automatic

控制台接收到kafka的日志消息

{
        "logger" => "com.paw.kafka.elk.controller.KafkaLogController",
      "@version" => 1,
        "thread" => "http-nio-8080-exec-7",
      "levelVal" => 20000,
    "@timestamp" => 2021-08-01T07:10:27.273Z,
       "appName" => "paw-kelk",
       "message" => "cost time: 23",
           "env" => "dev",
        "caller" => {
          "file" => "KafkaLogController.java",
        "method" => "kafka",
         "class" => "com.paw.kafka.elk.controller.KafkaLogController",
          "line" => 35
    },
         "level" => "INFO"
}
{
        "logger" => "com.paw.kafka.elk.controller.KafkaLogController",
      "@version" => 1,
        "thread" => "http-nio-8080-exec-7",
      "levelVal" => 10000,
    "@timestamp" => 2021-08-01T07:10:27.273Z,
       "appName" => "paw-kelk",
       "message" => "debug time: 23",
           "env" => "dev",
        "caller" => {
          "file" => "KafkaLogController.java",
        "method" => "kafka",
         "class" => "com.paw.kafka.elk.controller.KafkaLogController",
          "line" => 36
    },
         "level" => "DEBUG"
}

至此kafka日志写入logstash完成。logsstash作为kafka日志topic的一个消费端,kafka将日志发往logstash,logtash以输入流方式结束日志数据,经过filter加工处理输出到输出流如elasticsearch中。

相关文章
|
4月前
|
Java 应用服务中间件 nginx
微服务框架(二十九)Logstash Nginx 日志上报
此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。 本文为Logstash Nginx 日志上报 本系列文章中所使用的框架版本为Spring Boot 2.0.3...
|
5月前
|
消息中间件 存储 Kafka
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
|
4月前
|
Dubbo Java 应用服务中间件
微服务框架(三十)Logstash Kong 日志上报
此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。 本文为Logstash Kong 日志上报 本系列文章中所使用的框架版本为Spring Boot 2.0.3-...
|
4月前
|
JSON Java 数据格式
微服务框架(十三)Spring Boot Logstash日志采集
  本文为Spring Boot中Log4j2对接Logstash,进行日志采集。Logstah只支持log4j,使用log4j2时需要通过TCP插件调用 此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。
|
5月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
7月前
|
存储 NoSQL Redis
容器部署日志分析平台ELK7.10.1(Elasisearch+Filebeat+Redis+Logstash+Kibana)
容器部署日志分析平台ELK7.10.1(Elasisearch+Filebeat+Redis+Logstash+Kibana)
155 0
|
7月前
|
NoSQL Redis 索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
102 0
|
4月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
154 0
|
7天前
|
数据采集 监控 数据可视化
日志解析神器——Logstash中的Grok过滤器使用详解
日志解析神器——Logstash中的Grok过滤器使用详解
30 4
|
6月前
|
消息中间件 Kafka 网络安全
淘东电商项目(49) -ELK+Kafka分布式日志收集(docker下搭建kafka)
淘东电商项目(49) -ELK+Kafka分布式日志收集(docker下搭建kafka)
62 0