ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
总体架构说明
环境说明
host | hostname | 运行软件 |
192.168.179.123 | node-5 | kafka、zookeeper、es、logstash、filebeat |
192.168.179.124 | node-4 | kafka、zookeeper、es、kibana、filebeat |
192.168.179.125 | node-3 | kafka、zookeeper |
以上是我的节点及机器上安装的相关软件,东西会比较多,但是运行都离不开我们上面的架构图。
日志系统各个模块配置
我这里写出跟日志系统相关的配置,es
、kibana
这个就不贴了,前面有的。zookeeper
、kafka
配置好集群就可以啦。可以从 kafka高可用集群搭建 这篇文章里面找到搭建配置
logstash配置
新建一个 kafka
的 logstash
配置
- vi conf.d/kafka.conf
input{ kafka { codec => json topics => "kafka_topic" bootstrap_servers => "192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092" } } output{ if [fields][logsource] == "nginx" { # 根据filebeat里面的fields -> logsource 来确定是什么服务的日志 elasticsearch{ hosts => ["192.168.179.124:9200", "192.168.179.123:9200"] #elasticsearch服务器地址 user => "elastic" password => "${ES_PWD}" index =>"kafka-nginx-log-%{+YYYY.MM.dd}.log" } } if [fields][logsource] == "mysql" { elasticsearch{ hosts => ["192.168.179.124:9200", "192.168.179.123:9200"] #elasticsearch服务器地址 user => "elastic" password => "${ES_PWD}" index =>"kafka-mysql-log-%{+YYYY.MM.dd}.log" } } }
filebeat配置
vi filebeat.yml (node-5的配置)
filebeat.inputs: - type: log enabled: true paths: - /var/log/test-filebeat.log fields: # 自定义字段,可以用于区分日志 logsource: mysql output.kafka: hosts: ["192.168.179.123:9092","192.168.179.124:9092","192.168.179.125:9092"] topic: "kafka_topic" partition.round_robin: # 开启kafka的partition分区 reachable_only: false required_acks: 1 compression: gzip #压缩格式 max_message_bytes: 1000000 #压缩格式字节大小
vi filebeat.yml (node-4的配置)
filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log fields: logsource: nginx output.kafka: hosts: ["192.168.179.123:9092","192.168.179.124:9092","192.168.179.125:9092"] topic: "kafka_topic" partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
启动模块
- 完成好各个阶段的配置后,我们逐步对模块进行启动
# 对es节点进行启动 (node-4、node-5) su - elastic -c "/home/test/elasticsearch-7.6.0/bin/elasticsearch -d" # 启动kibana (node-4) sudo nohup ./bin/kibana --allow-root # 启动kafka集群 (node-4、node-5) ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties ./bin/kafka-server-start.sh -daemon ./config/server.properties # 创建用于消息传输的kafka topic ./bin/kafka-topics.sh --create --bootstrap-server 192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092 --replication-factor 3 --partitions 1 --topic kafka_topic # 查看topic是否正常 ./bin/kafka-topics.sh --describe --bootstrap-server 192.168.179.123:9092,192.168.179.124:9092,192.168.179.125:9092 --topic kafka_topic # 启动filebeat (node-4、node-5) ./filebeat -e -c filebeat.yml
这个时候 filebeat
开始对日志进行收集,然后传输到 kafka
的 topic
,我们利用 kafkatool
查看一下 kafka_topic
数据
已经可以看到有数据进来了,然后我们启动 logstash
对 kafka_topic
进行消费,输出到我们的 es
集群
sudo ./bin/logstash -f ./conf.d/kafka.conf
logstash
正常启动后,已经可以通过 kibana
查看到索引信息了, filebeat
配置了 [fields][logsource]
用来创建不同应用的索引。我们这里有 nginx
和 mysql
的索引,我们可视化看看
kibana查看索引
总结
我知道这个 日志系统
待优化的地方很多,这只是我们简单搭建起来,让大家对日志系统有一个整体的认识,到这,日志系统的搭建就完结了。希望大家继续学习呀!