ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统

ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统

总体架构说明

环境说明

host hostname 运行软件
192.168.179.123 node-5 kafka、zookeeper、es、logstash、filebeat
192.168.179.124 node-4 kafka、zookeeper、es、kibana、filebeat
192.168.179.125 node-3 kafka、zookeeper

以上是我的节点及机器上安装的相关软件,东西会比较多,但是运行都离不开我们上面的架构图

日志系统各个模块配置

我这里写出跟日志系统相关的配置,eskibana这个就不贴了,前面有的。zookeeperkafka 配置好集群就可以啦。可以从 kafka高可用集群搭建 这篇文章里面找到搭建配置

logstash配置

新建一个 kafkalogstash 配置

  • vi conf.d/kafka.conf
input{
    kafka {
        codec => json
        topics => "kafka_topic"
        bootstrap_servers => "192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092"
    }
}
output{
    if [fields][logsource] == "nginx" {   # 根据filebeat里面的fields -> logsource 来确定是什么服务的日志
        elasticsearch{
            hosts => ["192.168.179.124:9200", "192.168.179.123:9200"]        #elasticsearch服务器地址
            user => "elastic"
            password => "${ES_PWD}"
            index =>"kafka-nginx-log-%{+YYYY.MM.dd}.log"
        }
    }
    if [fields][logsource] == "mysql" {
        elasticsearch{
            hosts => ["192.168.179.124:9200", "192.168.179.123:9200"]        #elasticsearch服务器地址
            user => "elastic"
            password => "${ES_PWD}"
            index =>"kafka-mysql-log-%{+YYYY.MM.dd}.log"
        }
    }
}

filebeat配置

  • vi filebeat.yml (node-5的配置)
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/test-filebeat.log
  fields:   # 自定义字段,可以用于区分日志
      logsource: mysql  
output.kafka:
  hosts: ["192.168.179.123:9092","192.168.179.124:9092","192.168.179.125:9092"]
  topic: "kafka_topic"
  partition.round_robin: # 开启kafka的partition分区
    reachable_only: false
  required_acks: 1
  compression: gzip #压缩格式
  max_message_bytes: 1000000 #压缩格式字节大小
  • vi filebeat.yml (node-4的配置)
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  fields:
      logsource: nginx
output.kafka:
  hosts: ["192.168.179.123:9092","192.168.179.124:9092","192.168.179.125:9092"]
  topic: "kafka_topic"
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

启动模块

  • 完成好各个阶段的配置后,我们逐步对模块进行启动
# 对es节点进行启动 (node-4、node-5)
su - elastic -c "/home/test/elasticsearch-7.6.0/bin/elasticsearch -d"
# 启动kibana (node-4)
sudo nohup ./bin/kibana --allow-root
# 启动kafka集群 (node-4、node-5)
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties
# 创建用于消息传输的kafka topic
./bin/kafka-topics.sh --create --bootstrap-server 192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092 --replication-factor 3 --partitions 1 --topic kafka_topic
# 查看topic是否正常
./bin/kafka-topics.sh --describe --bootstrap-server 192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092 --topic kafka_topic
# 启动filebeat (node-4、node-5)
./filebeat -e -c filebeat.yml

这个时候 filebeat 开始对日志进行收集,然后传输到 kafkatopic ,我们利用 kafkatool 查看一下 kafka_topic 数据


已经可以看到有数据进来了,然后我们启动 logstashkafka_topic 进行消费,输出到我们的 es 集群

sudo ./bin/logstash -f ./conf.d/kafka.conf

logstash 正常启动后,已经可以通过 kibana 查看到索引信息了, filebeat 配置了 [fields][logsource] 用来创建不同应用的索引。我们这里有 nginxmysql 的索引,我们可视化看看

kibana查看索引


总结

我知道这个 日志系统 待优化的地方很多,这只是我们简单搭建起来,让大家对日志系统有一个整体的认识,到这,日志系统的搭建就完结了。希望大家继续学习呀!

目录
相关文章
|
7月前
|
消息中间件 Java Kafka
搭建ELK日志收集,保姆级教程
本文介绍了分布式日志采集的背景及ELK与Kafka的整合应用。传统多服务器环境下,日志查询效率低下,因此需要集中化日志管理。ELK(Elasticsearch、Logstash、Kibana)应运而生,但单独使用ELK在性能上存在瓶颈,故结合Kafka实现高效的日志采集与处理。文章还详细讲解了基于Docker Compose构建ELK+Kafka环境的方法、验证步骤,以及如何在Spring Boot项目中整合ELK+Kafka,并通过Logback配置实现日志的采集与展示。
1217 64
搭建ELK日志收集,保姆级教程
|
数据可视化 关系型数据库 MySQL
ELK实现nginx、mysql、http的日志可视化实验
通过本文的步骤,你可以成功配置ELK(Elasticsearch, Logstash, Kibana)来实现nginx、mysql和http日志的可视化。通过Kibana,你可以直观地查看和分析日志数据,从而更好地监控和管理系统。希望这些步骤能帮助你在实际项目中有效地利用ELK来处理日志数据。
911 90
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
523 4
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
526 2
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
消息中间件 运维 Java
搭建Zookeeper、Kafka集群
本文详细介绍了Zookeeper和Kafka集群的搭建过程,涵盖系统环境配置、IP设置、主机名设定、防火墙与Selinux关闭、JDK安装等基础步骤。随后深入讲解了Zookeeper集群的安装与配置,包括数据目录创建、节点信息设置、SASL认证配置及服务启动管理。接着描述了Kafka集群的安装,涉及配置文件修改、安全认证设置、生产消费认证以及服务启停操作。最后通过创建Topic、发送与查看消息等测试验证集群功能。全网可搜《小陈运维》获取更多信息。
995 1
|
网络安全
window系统下安装elk
本文介绍了Elasticsearch、Logstash和Kibana(统称ELK栈)8.17.3版本的安装与配置流程。主要内容包括: - **Elasticsearch**:详细描述了从下载到启动服务的步骤,以及`elasticsearch.yml`的关键配置项,并提供了Postman操作示例及常见问题解决方案。 - **Logstash**:涵盖了插件安装、配置文件`logstash.conf`编写及其启动命令。 - **Kibana**:讲解了下载、配置`kibana.yml`和启动过程,确保与Elasticsearch正确连接。
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
3326 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
存储 监控 安全
下一篇
开通oss服务