ELFK对接zookeeper&kafka

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: ELFK对接zookeeper&kafka

1.Zookeeper集群部署
环境准本 Ubuntu22.04
10.0.0.71 elk71
10.0.0.72 elk72
10.0.0.73 elk73

1.1 下载解压zookeeper-3.8.4软件包
[root@elk71 ~]# wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
[root@elk71 ~]# tar xf apache-zookeeper-3.8.4-bin.tar.gz -C /Project/softwares/
1.2 配置环境变量
[root@elk71 ~]# cat /etc/profile.d/zk.sh

!/bin/bash

export JAVA_HOME=/usr/share/elasticsearch/jdk
export ZK_HOME=/Project/softwares/apache-zookeeper-3.8.4-bin
export PATH=$PATH:$ZK_HOME/bin:$JAVA_HOME/bin
[root@elk71 ~]#
[root@elk71 ~]# source /etc/profile.d/zk.sh
1.3 准备配置文件
[root@elk71 ~]# cp /Project/softwares/apache-zookeeper-3.8.4-bin/conf/zoo{_sample,}.cfg
[root@elk71 ~]# cat /Project/softwares/apache-zookeeper-3.8.4-bin/conf/zoo.cfg #修改配置文件

定义最小单元的时间范围tick。

tickTime=2000

启动时最长等待tick数量。

initLimit=5

数据同步时最长等待的tick时间进行响应ACK

syncLimit=2

指定数据目录

dataDir=/Project/data/zk

监听端口

clientPort=2181

开启四字命令允许所有的节点访问。

4lw.commands.whitelist=*

server.ID=A:B:C[:D]

ID:

zk的唯一编号。

A:

zk的主机地址。

B:

leader的选举端口,是谁leader角色,就会监听该端口。

C:

数据通信端口。

D:

可选配置,指定角色。

server.71=10.0.0.71:2888:3888
server.72=10.0.0.72:2888:3888
server.73=10.0.0.73:2888:3888

Metrics Providers

#

https://prometheus.io Metrics Exporter

metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider

metricsProvider.httpHost=0.0.0.0

metricsProvider.httpPort=7000

metricsProvider.exportJvmInfo=true

[root@elk71 ~]#
1.4 配置host解析
[root@elk71 ~]# cat >> /etc/hosts <<EOF
10.0.0.71 elk71
10.0.0.72 elk72
10.0.0.73 elk73
EOF
1.5 同步各节点数据
[root@elk71 ~]# scp /etc/hosts root@10.0.0.72:/etc/
[root@elk71 ~]# scp /etc/hosts root@10.0.0.73:/etc/
[root@elk71 ~]# scp /Project/softwares/apache-zookeeper-3.8.4-bin root@10.0.0.72:/Project/softwares/
[root@elk71 ~]# scp /Project/softwares/apache-zookeeper-3.8.4-bin root@10.0.0.73:/Project/softwares/
[root@elk71 ~]# scp /etc/profile.d/zk.sh root@10.0.0.72:/etc/profile.d/
[root@elk71 ~]# scp /etc/profile.d/zk.sh root@10.0.0.73:/etc/profile.d/
1.6 生成myid文件
[root@elk71 ~]# for ((host_id=71;host_id<=73;host_id++)) do ssh elk${host_id} "echo ${host_id} > /Project/data/zk/myid";done
1.7 启动服务
[root@elk71 ~]# zkServer.sh start
[root@elk72 ~]# source /etc/profile.d/zk.sh
[root@elk72 ~]# zkServer.sh start
[root@elk73 ~]# source /etc/profile.d/zk.sh
[root@elk73 ~]# zkServer.sh start
1.8 查看服务状态【leader在哪个节点都可以,但是不能出现2个leader】
[root@elk71 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@elk71 ~]#

[root@elk72 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@elk72 ~]#

[root@elk73 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@elk73 ~]#
2.连接zookeeper集群验证高可用
2.1 连接测试
[root@elk73 ~]# zkCli.sh -server 10.0.0.71:2181,10.0.0.72:2181,10.0.0.73:2181
...
WatchedEvent state:SyncConnected type:None path:null
[zk: 10.0.0.71:2181,10.0.0.72:2181,10.0.0.73:2181(CONNECTED) 0]
[zk: 10.0.0.71:2181,10.0.0.72:2181,10.0.0.73:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: 10.0.0.71:2181,10.0.0.72:2181,10.0.0.73:2181(CONNECTED) 1]
2.2 将leader节点挂掉【集群会自动选举出新的leader,集群正常对外提供服务】
[root@elk71 ~]# zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
[root@elk71 ~]#
[root@elk71 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.
[root@elk71 ~]#

[root@elk72 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@elk72 ~]#

[root@elk73 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@elk73 ~]#
2.3 再停止一个foller节点【zookeeper集群半数以上节点存活,才会对外提供服务】
1)停止72节点
[root@elk72 ~]# zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
[root@elk72 ~]#
[root@elk72 ~]#
[root@elk72 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.
[root@elk72 ~]#
2) 73节点的leader也挂了
[root@elk73 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /Project/softwares/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.
[root@elk73 ~]#
2.4 验证没有问题后,配置zookeeper的JVM调优
[root@elk71 ~]# vim /oldboyedu/softwares/apache-zookeeper-3.8.4-bin/bin/zkEnv.sh
...
137 # default heap for zookeeper server
138 # ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"
139 ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-128}"
140 # export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"
141 export SERVER_JVMFLAGS="-Xms${ZK_SERVER_HEAP}m -Xmx${ZK_SERVER_HEAP}m"
142
143 # default heap for zookeeper client
144 # ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"
145 ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-128}"
146 export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"
[root@elk71 ~]# scp /Project/softwares/apache-zookeeper-3.8.4-bin/bin/zkEnv.sh root@10.0.0.72:/Project/softwares/apache-zookeeper-3.8.4-bin/bin/
[root@elk71 ~]# scp /Project/softwares/apache-zookeeper-3.8.4-bin/bin/zkEnv.sh root@10.0.0.73:/Project/softwares/apache-zookeeper-3.8.4-bin/bin/
2.5 重启zookeeper集群,注意要滚动更新
[root@elk71 ~]# zkServer.sh restart
[root@elk71 ~]# zkServer.sh status

[root@elk72 ~]# zkServer.sh restart
[root@elk72 ~]# zkServer.sh status

[root@elk73 ~]# zkServer.sh restart
[root@elk73 ~]# zkServer.sh status
2.6 验证jvm的堆内存大小,所有节点都要查看
[root@elk71 ~]# ps -ef | grep zookeeper | grep -i xmx
root 8727 1 38 10:10 pts/0 00:00:01 /usr/share/elasticsearch/jdk/bin/java ... -Xms128m -Xmx128m
[root@elk71 ~]#
[root@elk71 ~]# free -h
total used free shared buff/cache available
Mem: 3.8Gi 1.3Gi 910Mi 1.0Mi 1.6Gi 2.2Gi
Swap: 4.0Gi 0B 4.0Gi
[root@elk71 ~]#
3.kafka部署 【数据总量达到100TB+就可以考虑使用kafka集群】
3.1 所有节点下载解压kafka_2.13-3.8.0.tgz软件包
[root@elk71 ~]#wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
[root@elk71 ~]# tar xf kafka_2.13-3.8.0.tgz -C /Project/softwares/
3.2 所有节点修改kafka的配置文件
[root@elk71 ~]# vim /Project/softwares/kafka_2.13-3.8.0/config/server.properties
...

修改kafka的broker的ID信息

broker.id=71 #broker.id改为对应的节点IP最后一个数字 如72,73

修改数据目录

log.dirs=/Project/data/kafka

修改元数据存储zookeeper集群地址

zookeeper.connect=10.0.0.71:2181,10.0.0.72:2181,10.0.0.73:2181/nolen_kafka380
3.3 启动kafaka实例
[root@elk71 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[root@elk72 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[root@elk73 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
4.filebeat写入数据到kafka集群
4.1 创建topic
[root@elk71 ~]# kafka-topics.sh --bootstrap-server 10.0.0.72:9092 --topic nolen-elk --partitions 3 --replication-factor 2 --create
Created topic nolen-elk.
[root@elk71 ~]#
[root@elk71 ~]# kafka-topics.sh --bootstrap-server 10.0.0.72:9092 --topic nolen-elk --describe
[2024-09-03 16:03:46,101] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient)
Topic: nolen-elk TopicId: 1OljOARkToC41ynUFU7gFw PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: nolen-elk Partition: 0 Leader: 73 Replicas: 73,71 Isr: 73,71 Elr: N/A LastKnownElr: N/A
Topic: nolen-elk Partition: 1 Leader: 71 Replicas: 71,72 Isr: 71,72 Elr: N/A LastKnownElr: N/A
Topic: nolen-elk Partition: 2 Leader: 72 Replicas: 72,73 Isr: 72,73 Elr: N/A LastKnownElr: N/A
[root@elk71 ~]#
4.2 filebeat写入数据到kafka
[root@elk71 ~]# cat /etc/filebeat/nginx-to-kafka.yaml
filebeat:
inputs:

  • type: filestream
    paths:
    • /var/log/nginx/access.log*

output.kafka:

指定kafka集群地址

hosts: ["elk71:9092", "elk72:9092", "elk73:9092"]

执行写入的topic地址

topic: 'nolen-elk'
[root@elk71 ~]# filebeat -e -c /etc/filebeat/nginx-to-kafka.yaml
4.3 kafka节点验证测试
[root@elk72 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.72:9092 --topic nolen-elk --from-beginning --group filebeat01
...
{"@timestamp":"2024-09-03T08:10:18.803Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.23"},"message":"77.13.20.11 - - [28/Aug/2024:07:48:24 +0000] \"GET / HTTP/1.1\" 200 396 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15\"","input":
...
5.logstash从kafka集群读取数据
5.1 编写配置文件
[root@elk73 ~]# cat /etc/logstash/conf.d/kafka-to-es.conf
input {
kafka {

# 指定kafka集群的地址
bootstrap_servers => "10.0.0.71:9092,10.0.0.72:9092,10.0.0.73:9092"

[kod.smxlzyg.com)
[kod.chinamyo.com)
[kod.china-hhrp.com)
[kod.lifucn.com)
[kod.stjiatai.com)
[kod.taotaodou.net)
[kod.dtzxxx.net)
[kod.ht819.com)

# 指定topic列表
topics => ["nolen-elk"]
# 指定消费者组
group_id => ["linux-elk"]
# 指定从offset开始读取数据的位置,earliest表示最早的数据开始读,latest表示从最新的位置读取。
auto_offset_reset => "earliest"

}
}

filter {
json {
source => "message"

remove_field => [ "input","host","agent","@version","log", "ecs" ]

}

基于正则匹配任意文本,grok内置了120种匹配模式

grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG}"
}
}

useragent {
source => "agent"
target => "nolen_agent"
}

geoip {
source => "clientip"
}

date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}

}

output {

stdout {

}

elasticsearch{
hosts => ["10.0.0.71:9200","10.0.0.72:9200","10.0.0.73:9200"]
index => "kafka-elk-%{+yyyy.MM.dd}"
user => "elastic"
password => "123456"
}
}
[root@elk93 ~]#
5.2 启动Logstash实例
[root@elk73 ~]# logstash -rf /etc/logstash/conf.d/kafka-to-es.conf

相关文章
|
1月前
|
消息中间件 运维 算法
Kafka 为什么要抛弃 Zookeeper?
本文探讨了Kafka为何逐步淘汰ZooKeeper。长久以来,ZooKeeper作为Kafka的核心组件,负责集群管理和协调任务。然而,随着Kafka的发展,ZooKeeper带来的复杂性增加、性能瓶颈及一致性问题日益凸显。为解决这些问题,Kafka引入了KRaft,这是一种基于Raft算法的内置元数据管理方案,不仅简化了部署流程,还提升了系统的一致性和扩展性。本文详细分析了这一转变背后的原因及其带来的优势,并展望了Kafka未来的发展方向。
107 1
|
4月前
|
消息中间件 存储 算法
深度揭秘!Kafka和ZooKeeper之间的相爱相杀
**摘要:** 本文介绍了Kafka和ZooKeeper的角色及其关系。Kafka是分布式流处理平台,用于实时数据管道和流应用;ZooKeeper是分布式协调服务,处理同步和集群协调。在Kafka中,ZooKeeper存储元数据,管理集群成员,选举Controller。随着KIP-500提案,Kafka计划移除对ZooKeeper的依赖,转向基于Raft的共识机制,以简化架构、提高性能和一致性。此外,文章提到了etcd作为基于Raft的元数据存储系统的应用。本文旨在帮助读者理解ZooKeeper在Kafka面试中的重要性,并了解Kafka的未来发展方向。
327 2
|
1月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
75 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
3月前
|
消息中间件 存储 Kafka
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
103 2
|
4月前
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
50 3
|
4月前
|
消息中间件 NoSQL Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
|
4月前
|
消息中间件 应用服务中间件 Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
|
4月前
|
消息中间件 Kafka
kafka配置中启动zookeeper时没有启动成功的解决办法
kafka配置中启动zookeeper时没有启动成功的解决办法
|
6月前
|
消息中间件 Kafka
Kafka【部署 03】Zookeeper与Kafka自动部署脚本
【4月更文挑战第11天】Kafka【部署 03】Zookeeper与Kafka自动部署脚本
93 8
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。