【Kafka】zookeeper对于kafka的作用是什么?

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,182元/月
简介: 【4月更文挑战第6天】【Kafka】zookeeper对于kafka的作用是什么?

image.png

ZooKeeper(动物管理员)是一个开源的分布式协调服务,用于分布式系统中的各种任务,如配置管理、命名服务、分布式同步、集群管理等。对于Kafka这样的分布式消息系统,ZooKeeper发挥着至关重要的作用。在Kafka中,ZooKeeper主要用于以下几个方面:

  1. 集群管理
    ZooKeeper负责管理Kafka集群的成员信息,包括Broker的上线和下线,以及集群中每个Broker的状态。Kafka Broker在启动时会向ZooKeeper注册自己的信息,并定期向ZooKeeper发送心跳以维持其状态。这些信息对于Kafka的整个运行是至关重要的,因为客户端需要知道哪些Broker是活动的,以及它们的网络地址和端口号等信息。

  2. 主题和分区的元数据管理
    ZooKeeper保存了Kafka集群中所有主题(topics)和分区(partitions)的元数据信息,包括主题的名称、分区的数量和分区所在的Broker等。这些元数据对于生产者和消费者来说是必需的,因为它们需要知道消息是如何分布在集群中的,以便正确地发送和接收消息。

  3. Leader选举
    在Kafka的分布式环境中,每个分区都有一个Leader Broker负责处理消息的写入和读取操作。如果Leader Broker失效,ZooKeeper将协助进行新的Leader选举过程,确保分区的高可用性。这是通过在ZooKeeper上创建临时的顺序节点来实现的,所有Broker都可以监视这些节点,并在Leader节点失效时触发重新选举。

  4. 消费者位移(offset)的存储
    Kafka消费者通常会保存其已消费消息的位移信息,以便在断开连接后能够恢复到上次消费的位置。ZooKeeper可以作为一个可靠的存储服务,用于保存消费者的位移信息。消费者可以将位移信息写入ZooKeeper,并在需要时从中读取。这样可以确保即使消费者所在的进程失败,位移信息也不会丢失。

  5. 配额管理
    ZooKeeper还可以用于管理Kafka集群的配额,包括限制生产者和消费者的请求速率、配额大小等。这对于确保集群的稳定性和性能至关重要。

总的来说,ZooKeeper在Kafka中扮演着“管家”的角色,负责管理集群中的各种元数据、状态信息和协调任务,从而保证Kafka集群的可靠运行和高可用性。

1. 创建一个ZooKeeper连接

import org.apache.zookeeper.ZooKeeper;

public class ZookeeperConnection {
   
   
    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            ZooKeeper zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            System.out.println("ZooKeeper session established: " + zooKeeper.getSessionId());
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}

2. 创建一个ZooKeeper节点

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

public class CreateZNode {
   
   
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            String path = "/example";
            String data = "Hello, ZooKeeper!";
            zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("ZooKeeper node created at " + path);
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}

3. 读取一个ZooKeeper节点的数据

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ReadZNodeData {
   
   
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            String path = "/example";
            Stat stat = new Stat();
            byte[] data = zooKeeper.getData(path, false, stat);
            String dataStr = new String(data);
            System.out.println("Data of ZooKeeper node " + path + ": " + dataStr);
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}

4. 监听一个ZooKeeper节点的变化

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class WatchZNode {
   
   
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            String path = "/example";
            Stat stat = zooKeeper.exists(path, new Watcher() {
   
   
                @Override
                public void process(WatchedEvent event) {
   
   
                    if (event.getType() == Event.EventType.NodeDataChanged) {
   
   
                        System.out.println("Data of ZooKeeper node " + path + " changed");
                    }
                }
            });

            if (stat != null) {
   
   
                byte[] data = zooKeeper.getData(path, true, stat);
                String dataStr = new String(data);
                System.out.println("Data of ZooKeeper node " + path + ": " + dataStr);
            }
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}
相关文章
|
消息中间件 运维 算法
Kafka 为什么要抛弃 Zookeeper?
本文探讨了Kafka为何逐步淘汰ZooKeeper。长久以来,ZooKeeper作为Kafka的核心组件,负责集群管理和协调任务。然而,随着Kafka的发展,ZooKeeper带来的复杂性增加、性能瓶颈及一致性问题日益凸显。为解决这些问题,Kafka引入了KRaft,这是一种基于Raft算法的内置元数据管理方案,不仅简化了部署流程,还提升了系统的一致性和扩展性。本文详细分析了这一转变背后的原因及其带来的优势,并展望了Kafka未来的发展方向。
713 1
|
6月前
|
消息中间件 运维 Java
搭建Zookeeper、Kafka集群
本文详细介绍了Zookeeper和Kafka集群的搭建过程,涵盖系统环境配置、IP设置、主机名设定、防火墙与Selinux关闭、JDK安装等基础步骤。随后深入讲解了Zookeeper集群的安装与配置,包括数据目录创建、节点信息设置、SASL认证配置及服务启动管理。接着描述了Kafka集群的安装,涉及配置文件修改、安全认证设置、生产消费认证以及服务启停操作。最后通过创建Topic、发送与查看消息等测试验证集群功能。全网可搜《小陈运维》获取更多信息。
524 1
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
356 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
消息中间件 存储 Kafka
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
473 2
|
消息中间件 Java Kafka
ELFK对接zookeeper&kafka
ELFK对接zookeeper&kafka
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
186 3
|
消息中间件 NoSQL Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)

热门文章

最新文章