6.Watcher机制(三)之ZooKeeper

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 本文深入分析ZooKeeper类源码,涵盖其内部类结构、核心属性与构造函数,重点解析create、delete、exists等同步/异步操作的实现机制,揭示Watcher注册管理及请求提交流程,全面展现客户端与服务端交互原理。

一、前言
  前面已经分析了Watcher机制中的大多数类,本篇对于ZKWatchManager的外部类Zookeeper进行分析。
二、ZooKeeper源码分析
2.1 类的内部类
  ZooKeeper的内部类框架图如下图所示
  
  
说明:
ZKWatchManager,Zookeeper的Watcher管理者,其源码在之前已经分析过,不再累赘。
WatchRegistration,抽象类,用作watch注册。
ExistsWatchRegistration,存在性watch注册。
DataWatchRegistration,数据watch注册。
ChildWatchRegistration,子节点注册。
States,枚举类型,表示服务器的状态。

  1. WatchRegistration
      接口类型,表示对路径注册监听。  
    说明:可以看到WatchRegistration包含了Watcher和clientPath字段,表示监听和对应的路径,值得注意的是getWatches方式抽象方法,需要子类实现,而在register方法中会调用getWatches方法,实际上调用的是子类的getWatches方法,这是典型的工厂模式。register方法首先会判定是否需要添加监听,然后再进行相应的操作,在WatchRegistration类的默认实现中shouldAddWatch是判定返回码是否为0。
  2. ExistsWatchRegistration 
    说明:ExistsWatchRegistration 表示对存在性监听的注册,其实现了getWatches方法,并且重写了shouldAddWatch方法,getWatches方法是根据返回码的值确定返回dataWatches或者是existWatches。
  3. DataWatchRegistration
    说明:DataWatchRegistration表示对数据监听的注册,其实现了getWatches方法,返回dataWatches。
  4. ChildWatchRegistration
    说明:ChildWatchRegistration表示对子节点监听的注册,其实现了getWatches方法,返回childWatches。
  5. States
    说明:States为枚举类,表示服务器的状态,其有两个方法,判断服务器是否存活和判断客户端是否连接至服务端。
    2.2 类的属性  
      说明:ZooKeeper类存维护一个ClientCnxn类,用来管理客户端与服务端的连接。  
    2.3 类的构造函数
  6. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)型构造函数    
      说明:该构造函数会初始化WatchManager的defaultWatcher,同时会解析服务端地址和端口号,之后根据服务端的地址生成HostProvider(其会打乱服务器的地址),之后生成客户端管理并启动,注意此时会调用getClientCnxnSocket函数,其源码如下  
    说明:该函数会利用反射创建ClientCnxnSocketNIO实例
  7. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException型构造函数  
      说明:此型构造函数和之前构造函数的区别在于本构造函数提供了sessionId和sessionPwd,这表明用户已经之前已经连接过服务端,所以能够获取到sessionId,其流程与之前的构造函数类似,不再累赘。
    2.4 核心函数分析
  8. create函数  
    函数签名:
    public String create(final String path, byte data[], List acl, CreateMode createMode)
    throws KeeperException, InterruptedException
    说明:该create函数是同步的,主要用作创建节点,其大致步骤如下
      ① 验证路径是否合法,若不合法,抛出异常,否则进入②
      ② 添加根空间,生成请求头、请求、响应等,并设置相应字段,进入③
      ③ 通过客户端提交请求,判断返回码是否为0,若不是,则抛出异常,否则,进入④
      ④ 除去根空间后,返回响应的路径
      其中会调用submitRequest方法,其源码如下  
    说明:submitRequest会将请求封装成Packet包,然后一直等待packet包响应结束,然后返回;若没结束,则等待。可以看到其是一个同步方法。
  9. create函数
    函数签名:
    public void create(final String path, byte data[], List acl, CreateMode createMode, StringCallback cb, Object ctx)  
    说明:该create函数是异步的,其大致步骤与同步版的create函数相同,只是最后其会将请求打包成packet,然后放入队列等待提交。
  10. delete函数  
    函数签名:public void delete(final String path, int version) throws InterruptedException, KeeperException
    说明:该函数是同步的,其流程与create流程相似,不再累赘。
  11. delete函数
    函数签名:public void delete(final String path, int version, VoidCallback cb, Object ctx)
      说明:该函数是异步的,其流程也相对简单,不再累赘。
  12. multi函数  
    说明:该函数用于执行多个操作或者不执行,其首先会验证每个操作的合法性,然后将每个操作添加根空间后加入到事务列表中,之后会调用multiInternal函数,其源码如下  
    说明:multiInternal函数会提交多个操作并且等待响应结果集,然后判断结果集中是否有异常,若有异常则抛出异常,否则返回响应结果集。
  13. exists函数  
    函数签名:public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
    Java
    运行代码
    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public Stat exists(final String path, Watcher watcher)

     throws KeeperException, InterruptedException
    

    {

     final String clientPath = path;
    
     // 验证路径是否合法
     PathUtils.validatePath(clientPath);
     // the watch contains the un-chroot path
     WatchRegistration wcb = null;
     if (watcher != null) { // 生成存在性注册
         wcb = new ExistsWatchRegistration(watcher, clientPath);
     }
     // 添加根空间
     final String serverPath = prependChroot(clientPath);
     // 新生请求头
     RequestHeader h = new RequestHeader();
     // 设置请求头类型
     h.setType(ZooDefs.OpCode.exists);
     // 新生节点存在请求
     ExistsRequest request = new ExistsRequest();
     // 设置路径
     request.setPath(serverPath);
     // 设置Watcher
     request.setWatch(watcher != null);
     // 新生设置数据响应
     SetDataResponse response = new SetDataResponse();
     // 提交请求
     ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
     if (r.getErr() != 0) { // 判断返回码
         if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
             return null;
         }
         throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                 clientPath);
     }
    
     // 返回结果的状态
     return response.getStat().getCzxid() == -1 ? null : response.getStat();
    

    }
    说明:该函数是同步的,用于判断指定路径的节点是否存在,值得注意的是,其会对指定路径的结点进行注册监听。

  14. exists
    函数签名:public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx) 
    Java
    运行代码
    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public void exists(final String path, Watcher watcher,
         StatCallback cb, Object ctx)
    
    {
    final String clientPath = path;
    // 验证路径是否合法
    PathUtils.validatePath(clientPath);
    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) { // 生成存在性注册
     wcb = new ExistsWatchRegistration(watcher, clientPath);
    
    }
    // 添加根空间
    final String serverPath = prependChroot(clientPath);
    // 新生请求头
    RequestHeader h = new RequestHeader();
    // 设置请求头类型
    h.setType(ZooDefs.OpCode.exists);
    // 新生节点存在请求
    ExistsRequest request = new ExistsRequest();
    // 设置路径
    request.setPath(serverPath);
    // 设置Watcher
    request.setWatch(watcher != null);
    // 新生设置数据响应
    SetDataResponse response = new SetDataResponse();
    // 将请求封装成packet,放入队列,等待执行
    cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                  clientPath, serverPath, ctx, wcb);
    
    }
    说明:该函数是异步的,与同步的流程相似,不再累赘。
    之后的getData、setData、getACL、setACL、getChildren函数均类似,只是生成的响应类别和监听类别不相同,大同小异,不再累赘。
    三、总结
      本篇博文分析了Watcher机制的ZooKeeper类,该类包括了对服务器的很多事务性操作,并且包含了同步和异步两个版本,但是相对来说,较为简单。
相关文章
|
3月前
|
Java 测试技术 Linux
生产环境发布管理
本文介绍大型团队如何通过自动化部署平台实现多环境(dev/test/pre/prod)发布管理,涵盖各环境职责、基于Jenkins+K8S的CI/CD流程、分支可视化操作、容器化部署机制及日志排查方案,提升发布效率与系统稳定性。
|
3月前
|
Java Nacos Maven
Eureka服务注册与发现
本章介绍SpringCloud中Eureka注册中心的搭建与使用,完成eureka-server、user-service、order-service的集成部署,实现服务注册与发现。虽Eureka已被逐步替代,但其功能实现为后续Nacos替换奠定基础。
|
3月前
|
敏捷开发 Dubbo Java
需求开发人日评估
本文介绍敏捷开发中工时评估的关键——人日估算方法,涵盖开发、自测、联调、测试及发布各阶段周期参考,并提供常见需求如增删改查、导入导出、跨服务调用等的典型人日参考,助力团队科学规划迭代。
|
3月前
|
敏捷开发 Java 测试技术
为什么要单元测试
本文探讨单元测试如何让软件开发“提速”而非“踩刹车”。通过解析测试体系演进、测试金字塔理念,阐述单元测试在提升调试效率、代码质量与研发效能方面的核心价值,揭示其作为高质量软件基石的重要性。
|
3月前
|
存储 运维 Java
微服务概述
本文介绍了单体应用与微服务架构的区别,阐述了微服务的定义、特征、优缺点及技术实现方案。微服务通过服务拆分,实现独立开发、部署与扩展,提升系统灵活性和可维护性,但也带来运维、分布式事务等挑战。文章最后探讨了技术选型与架构图设计,为微服务落地提供理论指导。(238字)
|
3月前
|
Java 测试技术 Shell
Jmeter快速入门
JMeter是基于JDK的性能测试工具,需先安装并配置JDK。下载解压后,通过bin目录下的脚本启动,支持中文设置与HTTP请求测试,可添加线程组、取样器及监听器,实现接口与性能测试,操作简便,功能强大。(238字)
|
3月前
|
关系型数据库 MySQL Linux
开发环境搭建
工欲善其事,必先利其器。学习前请确保电脑内存16G以上(推荐32G),建议配备便携显示器分屏开发以提升效率。下载并安装虚拟机及课程资料,配置CentOS 7虚拟机(IP: 192.168.101.68),使用FinalShell远程连接,启动Docker、MySQL等服务。苹果用户需自行安装Docker与MySQL 8。详见配置文档。
|
3月前
|
存储 消息中间件 开发框架
应用架构图
在业务架构基础上,技术架构将需求转化为技术实现。它涵盖分层设计、技术选型与关键技术关系,形成完整架构图。单体架构通常分为表现、业务、数据和基础四层,分别支撑用户交互、业务逻辑、数据存储与通用能力。分布式架构则体现应用间及外部系统的调用关系,明确内外边界,指导系统集成与扩展。
|
3月前
|
消息中间件 Java Kafka
消息中间件RabbitMQ(基础)
本章节深入讲解消息中间件在微服务架构中的核心作用,通过RabbitMQ实现异步通信,解耦服务、提升性能与可靠性,并结合SpringAMQP简化开发流程,助力构建高效、可扩展的分布式系统。
|
6天前
|
人工智能 自然语言处理 JavaScript
2026年Windows+Ollama本地部署OpenClaw保姆级教程:本地AI Agent+阿里云上快速搭建
2026年OpenClaw凭借本地部署、私有化运行的特性,成为打造个人智能体的核心工具,而Ollama作为轻量级本地大模型管理工具,能让OpenClaw摆脱对云端大模型的依赖,实现**本地推理、数据不泄露、全流程私有化**的智能体验。本文基于Windows 11系统,从硬件环境准备、Ollama安装与模型定制、OpenClaw部署配置、技能扩展到常见问题排查,打造保姆级本地部署教程,同时补充阿里云OpenClaw(Clawdbot)快速部署步骤,兼顾本地私有化需求与云端7×24小时运行需求,文中所有代码命令均可直接复制执行,确保零基础用户也能快速搭建属于自己的本地智能体。
8016 63