ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统

ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统

总体架构说明

环境说明

host hostname 运行软件
192.168.179.123 node-5 kafka、zookeeper、es、logstash、filebeat
192.168.179.124 node-4 kafka、zookeeper、es、kibana、filebeat
192.168.179.125 node-3 kafka、zookeeper

以上是我的节点及机器上安装的相关软件,东西会比较多,但是运行都离不开我们上面的架构图

日志系统各个模块配置

我这里写出跟日志系统相关的配置,eskibana这个就不贴了,前面有的。zookeeperkafka 配置好集群就可以啦。可以从 kafka高可用集群搭建 这篇文章里面找到搭建配置

logstash配置

新建一个 kafkalogstash 配置

  • vi conf.d/kafka.conf
input{
    kafka {
        codec => json
        topics => "kafka_topic"
        bootstrap_servers => "192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092"
    }
}
output{
    if [fields][logsource] == "nginx" {   # 根据filebeat里面的fields -> logsource 来确定是什么服务的日志
        elasticsearch{
            hosts => ["192.168.179.124:9200", "192.168.179.123:9200"]        #elasticsearch服务器地址
            user => "elastic"
            password => "${ES_PWD}"
            index =>"kafka-nginx-log-%{+YYYY.MM.dd}.log"
        }
    }
    if [fields][logsource] == "mysql" {
        elasticsearch{
            hosts => ["192.168.179.124:9200", "192.168.179.123:9200"]        #elasticsearch服务器地址
            user => "elastic"
            password => "${ES_PWD}"
            index =>"kafka-mysql-log-%{+YYYY.MM.dd}.log"
        }
    }
}

filebeat配置

  • vi filebeat.yml (node-5的配置)
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/test-filebeat.log
  fields:   # 自定义字段,可以用于区分日志
      logsource: mysql  
output.kafka:
  hosts: ["192.168.179.123:9092","192.168.179.124:9092","192.168.179.125:9092"]
  topic: "kafka_topic"
  partition.round_robin: # 开启kafka的partition分区
    reachable_only: false
  required_acks: 1
  compression: gzip #压缩格式
  max_message_bytes: 1000000 #压缩格式字节大小
  • vi filebeat.yml (node-4的配置)
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  fields:
      logsource: nginx
output.kafka:
  hosts: ["192.168.179.123:9092","192.168.179.124:9092","192.168.179.125:9092"]
  topic: "kafka_topic"
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

启动模块

  • 完成好各个阶段的配置后,我们逐步对模块进行启动
# 对es节点进行启动 (node-4、node-5)
su - elastic -c "/home/test/elasticsearch-7.6.0/bin/elasticsearch -d"
# 启动kibana (node-4)
sudo nohup ./bin/kibana --allow-root
# 启动kafka集群 (node-4、node-5)
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties
# 创建用于消息传输的kafka topic
./bin/kafka-topics.sh --create --bootstrap-server 192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092 --replication-factor 3 --partitions 1 --topic kafka_topic
# 查看topic是否正常
./bin/kafka-topics.sh --describe --bootstrap-server 192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092 --topic kafka_topic
# 启动filebeat (node-4、node-5)
./filebeat -e -c filebeat.yml

这个时候 filebeat 开始对日志进行收集,然后传输到 kafkatopic ,我们利用 kafkatool 查看一下 kafka_topic 数据


已经可以看到有数据进来了,然后我们启动 logstashkafka_topic 进行消费,输出到我们的 es 集群

sudo ./bin/logstash -f ./conf.d/kafka.conf

logstash 正常启动后,已经可以通过 kibana 查看到索引信息了, filebeat 配置了 [fields][logsource] 用来创建不同应用的索引。我们这里有 nginxmysql 的索引,我们可视化看看

kibana查看索引


总结

我知道这个 日志系统 待优化的地方很多,这只是我们简单搭建起来,让大家对日志系统有一个整体的认识,到这,日志系统的搭建就完结了。希望大家继续学习呀!

目录
相关文章
|
1月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
82 7
|
3月前
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
1月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
56 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
1月前
|
存储 监控 安全
|
1月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
4月前
|
消息中间件 Kafka 开发工具
rsyslog+ELK收集Cisco日志
rsyslog+ELK收集Cisco日志
|
4月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
4月前
|
消息中间件 存储 Kafka
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
127 2
|
4月前
|
存储 消息中间件 监控
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统ELK、日志收集分析
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统、日志收集分析。日志级别从小到大的关系(优先级从低到高): ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 低级别的会输出高级别的信息,高级别的不会输出低级别的信息
|
4月前
|
消息中间件 Java Kafka
【Azure 事件中心】在Windows系统中使用 kafka-consumer-groups.bat 查看Event Hub中kafka的consumer groups信息
【Azure 事件中心】在Windows系统中使用 kafka-consumer-groups.bat 查看Event Hub中kafka的consumer groups信息