【Kafka】zookeeper对于kafka的作用是什么?

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【4月更文挑战第6天】【Kafka】zookeeper对于kafka的作用是什么?

image.png

ZooKeeper(动物管理员)是一个开源的分布式协调服务,用于分布式系统中的各种任务,如配置管理、命名服务、分布式同步、集群管理等。对于Kafka这样的分布式消息系统,ZooKeeper发挥着至关重要的作用。在Kafka中,ZooKeeper主要用于以下几个方面:

  1. 集群管理
    ZooKeeper负责管理Kafka集群的成员信息,包括Broker的上线和下线,以及集群中每个Broker的状态。Kafka Broker在启动时会向ZooKeeper注册自己的信息,并定期向ZooKeeper发送心跳以维持其状态。这些信息对于Kafka的整个运行是至关重要的,因为客户端需要知道哪些Broker是活动的,以及它们的网络地址和端口号等信息。

  2. 主题和分区的元数据管理
    ZooKeeper保存了Kafka集群中所有主题(topics)和分区(partitions)的元数据信息,包括主题的名称、分区的数量和分区所在的Broker等。这些元数据对于生产者和消费者来说是必需的,因为它们需要知道消息是如何分布在集群中的,以便正确地发送和接收消息。

  3. Leader选举
    在Kafka的分布式环境中,每个分区都有一个Leader Broker负责处理消息的写入和读取操作。如果Leader Broker失效,ZooKeeper将协助进行新的Leader选举过程,确保分区的高可用性。这是通过在ZooKeeper上创建临时的顺序节点来实现的,所有Broker都可以监视这些节点,并在Leader节点失效时触发重新选举。

  4. 消费者位移(offset)的存储
    Kafka消费者通常会保存其已消费消息的位移信息,以便在断开连接后能够恢复到上次消费的位置。ZooKeeper可以作为一个可靠的存储服务,用于保存消费者的位移信息。消费者可以将位移信息写入ZooKeeper,并在需要时从中读取。这样可以确保即使消费者所在的进程失败,位移信息也不会丢失。

  5. 配额管理
    ZooKeeper还可以用于管理Kafka集群的配额,包括限制生产者和消费者的请求速率、配额大小等。这对于确保集群的稳定性和性能至关重要。

总的来说,ZooKeeper在Kafka中扮演着“管家”的角色,负责管理集群中的各种元数据、状态信息和协调任务,从而保证Kafka集群的可靠运行和高可用性。

1. 创建一个ZooKeeper连接

import org.apache.zookeeper.ZooKeeper;

public class ZookeeperConnection {
   
   
    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            ZooKeeper zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            System.out.println("ZooKeeper session established: " + zooKeeper.getSessionId());
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}

2. 创建一个ZooKeeper节点

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

public class CreateZNode {
   
   
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            String path = "/example";
            String data = "Hello, ZooKeeper!";
            zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("ZooKeeper node created at " + path);
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}

3. 读取一个ZooKeeper节点的数据

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ReadZNodeData {
   
   
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            String path = "/example";
            Stat stat = new Stat();
            byte[] data = zooKeeper.getData(path, false, stat);
            String dataStr = new String(data);
            System.out.println("Data of ZooKeeper node " + path + ": " + dataStr);
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}

4. 监听一个ZooKeeper节点的变化

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class WatchZNode {
   
   
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
   
   
        String host = "localhost";
        int port = 2181;
        int sessionTimeout = 3000;

        try {
   
   
            zooKeeper = new ZooKeeper(host + ":" + port, sessionTimeout, null);
            String path = "/example";
            Stat stat = zooKeeper.exists(path, new Watcher() {
   
   
                @Override
                public void process(WatchedEvent event) {
   
   
                    if (event.getType() == Event.EventType.NodeDataChanged) {
   
   
                        System.out.println("Data of ZooKeeper node " + path + " changed");
                    }
                }
            });

            if (stat != null) {
   
   
                byte[] data = zooKeeper.getData(path, true, stat);
                String dataStr = new String(data);
                System.out.println("Data of ZooKeeper node " + path + ": " + dataStr);
            }
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }
}
相关文章
|
21天前
|
消息中间件 缓存 Kafka
Kafka ProducerConfig和ConsumerConfig配置
Kafka ProducerConfig和ConsumerConfig配置
|
3月前
|
消息中间件 存储 Java
ZooKeeper 在 Kafka 中的应用
ZooKeeper 在 Kafka 中的应用
71 0
|
7月前
|
消息中间件 分布式计算 负载均衡
zookeeper+Kafka
zookeeper+Kafka
33 0
|
7月前
|
消息中间件 分布式计算 Hadoop
Kafka集群搭建
Kafka集群搭建
153 1
|
7月前
|
消息中间件 存储 缓存
聊聊 Kafka: Kafka 为啥这么快?
聊聊 Kafka: Kafka 为啥这么快?
|
8月前
|
消息中间件 分布式计算 负载均衡
zookeeper+kafka
zookeeper+kafka
76 0
|
11月前
|
消息中间件 存储 缓存
一起来学kafka之Kafka集群搭建
前言 目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~ 本节给大家讲一下Kafka的一些核心概念以及如何利用docker快速的搭建Kafka集群~ 好了, 废话不多说直接开整吧~ 什么是 Kafka Kafka是一种高吞吐量、分布式、可扩展的消息中间件系统,最初由LinkedIn公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台,现在在大数据应用中也是十分广泛。
|
11月前
|
消息中间件 缓存 负载均衡
Kafka实战(一) : 认识Kafka
Kafka实战(一) : 认识Kafka
|
消息中间件 运维 负载均衡
Zookeeper 3.5.8 & Kafka 2.4.0 安装与调优
Zookeeper 3.5.8 & Kafka 2.4.0 安装与调优
650 0
|
消息中间件 存储 缓存
【Kafka】(二)kafka 核心组件2
【Kafka】(二)kafka 核心组件2
114 0

热门文章

最新文章