ZooKeeper类初探

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:     ZooKeeper源码分析的版本:3.4.10。 一.创建ZooKeeper对象     ZooKeeper类是ZooKeeper客户端的实现,用来发送命令给ZooKeeper服务器。 ZooKeeper中可以设置Watcher,每个Watcher在节点状态发生变化的时候被通知,执行预先注册的Watcher动作。

    ZooKeeper源码分析的版本:3.4.10。

一.创建ZooKeeper对象

    ZooKeeper类是ZooKeeper客户端的实现,用来发送命令给ZooKeeper服务器。 ZooKeeper中可以设置Watcher,每个Watcher在节点状态发生变化的时候被通知,执行预先注册的Watcher动作。 

    ZooKeeper有三种Watcher列表: (1)DataWatcher (2)ExistWatcher (3)ChildWatcher.

    ClientCnxn是客户端和服务端通信的底层接口,和ClientCnxnSocket一起工作提供网络通信服务。

protected final ClientCnxn cnxn;// 成员变量cnxn,连接服务器,通过cnxn发送命令给服务端
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly)
    throws IOException
{
    ......// 打印log
    watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);// 从传入的服务器地址字符串中解析出服务器地址
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());// 提供服务器地址,当服务器发生故障无法连接时,会自动连接其它的服务器
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);// 构建和服务器通信的对象cnxn
    cnxn.start();
}


二.create操作

    调用create在ZooKeeper中创建一个Node,返回值是成功创建的路径名称: 首先看看 create 方法:


public String create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils.validatePath(clientPath, createMode.isSequential());
    final String serverPath = prependChroot(clientPath);
    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.create);// 设置操作代码为create
    CreateRequest request = new CreateRequest();
    CreateResponse response = new CreateResponse();
    request.setData(data);// 使用输入参数构造CreateRequest请求
    request.setFlags(createMode.toFlag());
    request.setPath(serverPath);
    if (acl != null && acl.size() == 0) {
        throw new KeeperException.InvalidACLException();
    }
    request.setAcl(acl);
    ReplyHeader r = cnxn.submitRequest(h, request, response, null);// 将请求提交发送给服务器
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                clientPath);
    }
    if (cnxn.chrootPath == null) {
        return response.getPath();// 从返回的CreateResponse中获取创建成功后的路径
    } else {
        return response.getPath().substring(cnxn.chrootPath.length());
    }
}
    

    在 create 中通过 submitRequest 来提交请求:


public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);// 将CreateRequest转换成Packet包
    synchronized (packet) {
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}



    queuePacket 将CreateRequest转换成Packet包:


Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration)
{
    Packet packet = null;
    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);// 将CreateRequest转换成Packet包
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);// 将发送包放入队列,等待发送线程发送给服务器
        }
    }
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}


三.delete操作

    删除节点操作,提供同步和异步两种接口方式:


public void delete(final String path, int version, VoidCallback cb,
        Object ctx)
{
    final String clientPath = path;
    PathUtils.validatePath(clientPath);// 校验传入的路径是否合法
    final String serverPath;
    // maintain semantics even in chroot case
    // specifically - root cannot be deleted
    // I think this makes sense even in chroot case.
    if (clientPath.equals("/")) {
        // a bit of a hack, but delete(/) will never succeed and ensures
        // that the same semantics are maintained
        serverPath = clientPath;
    } else {
        serverPath = prependChroot(clientPath);
    }
    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.delete);// 设置操作代码为delete
    DeleteRequest request = new DeleteRequest();
    request.setPath(serverPath);// 使用输入参数构造DeleteRequest请求
    request.setVersion(version);
    cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
            serverPath, ctx, null);// 和create操作一样,调用queuePacket方法,将DeleteRequest转换成Packet包
}

四.其他类似操作

     exists:判断节点是否存在,异步方式。构造ExistsRequest请求对象,设置操作码ZooDefs.OpCode.exists;

     getData:获取节点关联数据。构造GetDataRequest请求对象,设置操作码ZooDefs.OpCode.getData;

     setData:设置节点关联数据。构造SetDataRequest请求对象,设置操作码ZooDefs.OpCode.setData;

     getChildren:获取子节点路径列表。构造GetChildrenRequest请求对象,设置操作码ZooDefs.OpCode.getChildren


    看完ZooKeeper类分析,是不是觉得很简单,都是差不多的套路:构建对应服务器操作的请求对象,打包成Packet,然后等待发送线程把这些发送包发送给服务器。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
3月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
|
22天前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
5月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
2月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
51 2
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
52 1
|
3月前
分布式-Zookeeper-数据订阅
分布式-Zookeeper-数据订阅
|
3月前
|
监控
分布式-Zookeeper-Zab协议
分布式-Zookeeper-Zab协议
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
52 0
下一篇
DataWorks