Watcher机制(三)之ZooKeeper

简介: 本文深入分析ZooKeeper类的源码,涵盖其内部类、属性、构造函数及核心方法。重点解析Watcher机制中的注册流程,以及create、delete、exists等同步与异步操作的实现原理,揭示ZooKeeper客户端与服务端交互的核心逻辑。

一、前言  前面已经分析了Watcher机制中的大多数类,本篇对于ZKWatchManager的外部类Zookeeper进行分析。二、ZooKeeper源码分析2.1 类的内部类  ZooKeeper的内部类框架图如下图所示  

  说明:ZKWatchManager,Zookeeper的Watcher管理者,其源码在之前已经分析过,不再累赘。WatchRegistration,抽象类,用作watch注册。ExistsWatchRegistration,存在性watch注册。DataWatchRegistration,数据watch注册。ChildWatchRegistration,子节点注册。States,枚举类型,表示服务器的状态。1. WatchRegistration  接口类型,表示对路径注册监听。  说明:可以看到WatchRegistration包含了Watcher和clientPath字段,表示监听和对应的路径,值得注意的是getWatches方式抽象方法,需要子类实现,而在register方法中会调用getWatches方法,实际上调用的是子类的getWatches方法,这是典型的工厂模式。register方法首先会判定是否需要添加监听,然后再进行相应的操作,在WatchRegistration类的默认实现中shouldAddWatch是判定返回码是否为0。2. ExistsWatchRegistration 说明:ExistsWatchRegistration 表示对存在性监听的注册,其实现了getWatches方法,并且重写了shouldAddWatch方法,getWatches方法是根据返回码的值确定返回dataWatches或者是existWatches。3. DataWatchRegistration 说明:DataWatchRegistration表示对数据监听的注册,其实现了getWatches方法,返回dataWatches。4. ChildWatchRegistration 说明:ChildWatchRegistration表示对子节点监听的注册,其实现了getWatches方法,返回childWatches。5. States说明:States为枚举类,表示服务器的状态,其有两个方法,判断服务器是否存活和判断客户端是否连接至服务端。2.2 类的属性    说明:ZooKeeper类存维护一个ClientCnxn类,用来管理客户端与服务端的连接。  2.3 类的构造函数1. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)型构造函数      说明:该构造函数会初始化WatchManager的defaultWatcher,同时会解析服务端地址和端口号,之后根据服务端的地址生成HostProvider(其会打乱服务器的地址),之后生成客户端管理并启动,注意此时会调用getClientCnxnSocket函数,其源码如下  说明:该函数会利用反射创建ClientCnxnSocketNIO实例2. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException型构造函数    说明:此型构造函数和之前构造函数的区别在于本构造函数提供了sessionId和sessionPwd,这表明用户已经之前已经连接过服务端,所以能够获取到sessionId,其流程与之前的构造函数类似,不再累赘。2.4 核心函数分析1. create函数  函数签名:public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException说明:该create函数是同步的,主要用作创建节点,其大致步骤如下  ① 验证路径是否合法,若不合法,抛出异常,否则进入②  ② 添加根空间,生成请求头、请求、响应等,并设置相应字段,进入③  ③ 通过客户端提交请求,判断返回码是否为0,若不是,则抛出异常,否则,进入④  ④ 除去根空间后,返回响应的路径  其中会调用submitRequest方法,其源码如下  说明:submitRequest会将请求封装成Packet包,然后一直等待packet包响应结束,然后返回;若没结束,则等待。可以看到其是一个同步方法。2. create函数函数签名:public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)  说明:该create函数是异步的,其大致步骤与同步版的create函数相同,只是最后其会将请求打包成packet,然后放入队列等待提交。3. delete函数  函数签名:public void delete(final String path, int version) throws InterruptedException, KeeperException说明:该函数是同步的,其流程与create流程相似,不再累赘。4. delete函数函数签名:public void delete(final String path, int version, VoidCallback cb, Object ctx)  说明:该函数是异步的,其流程也相对简单,不再累赘。5. multi函数  说明:该函数用于执行多个操作或者不执行,其首先会验证每个操作的合法性,然后将每个操作添加根空间后加入到事务列表中,之后会调用multiInternal函数,其源码如下  说明:multiInternal函数会提交多个操作并且等待响应结果集,然后判断结果集中是否有异常,若有异常则抛出异常,否则返回响应结果集。6. exists函数  函数签名:public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException说明:该函数是同步的,用于判断指定路径的节点是否存在,值得注意的是,其会对指定路径的结点进行注册监听。7. exists函数签名:public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx) 

Java

运行代码复制代码

public void exists(final String path, Watcher watcher,

           StatCallback cb, Object ctx)

{

   final String clientPath = path;

   // 验证路径是否合法

   PathUtils.validatePath(clientPath);

   // the watch contains the un-chroot path

   WatchRegistration wcb = null;

   if (watcher != null) { // 生成存在性注册

       wcb = new ExistsWatchRegistration(watcher, clientPath);

   }

   // 添加根空间

   final String serverPath = prependChroot(clientPath);

   // 新生请求头

   RequestHeader h = new RequestHeader();

   // 设置请求头类型

   h.setType(ZooDefs.OpCode.exists);

   // 新生节点存在请求

   ExistsRequest request = new ExistsRequest();

   // 设置路径

   request.setPath(serverPath);

   // 设置Watcher

   request.setWatch(watcher != null);

   // 新生设置数据响应

   SetDataResponse response = new SetDataResponse();

   // 将请求封装成packet,放入队列,等待执行

   cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,

                    clientPath, serverPath, ctx, wcb);

}


public void exists(final String path, Watcher watcher,

StatCallback cb, Object ctx)

{

final String clientPath = path;

// 验证路径是否合法

PathUtils.validatePath(clientPath);

// the watch contains the un-chroot path

WatchRegistration wcb = null;

if (watcher != null) { // 生成存在性注册

wcb = new ExistsWatchRegistration(watcher, clientPath);

}

// 添加根空间

final String serverPath = prependChroot(clientPath);

// 新生请求头

RequestHeader h = new RequestHeader();

// 设置请求头类型

h.setType(ZooDefs.OpCode.exists);

// 新生节点存在请求

ExistsRequest request = new ExistsRequest();

// 设置路径

request.setPath(serverPath);

// 设置Watcher

request.setWatch(watcher != null);

// 新生设置数据响应

SetDataResponse response = new SetDataResponse();

// 将请求封装成packet,放入队列,等待执行

cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,

clientPath, serverPath, ctx, wcb);

}

说明:该函数是异步的,与同步的流程相似,不再累赘。之后的getData、setData、getACL、setACL、getChildren函数均类似,只是生成的响应类别和监听类别不相同,大同小异,不再累赘。三、总结  本篇博文分析了Watcher机制的ZooKeeper类,该类包括了对服务器的很多事务性操作,并且包含了同步和异步两个版本,但是相对来说,较为简单。


相关文章
|
2月前
|
缓存 安全 Java
Watcher机制(二)WatchManager
本文深入分析ZooKeeper中WatchManager类的源码,重点解析其如何通过watchTable和watch2Paths两个映射管理Watcher与节点路径的关联关系,涵盖addWatch、removeWatcher、triggerWatch等核心方法的同步机制与执行流程,揭示事件监听与触发的底层原理。
39 0
|
2月前
|
人工智能 机器人 Java
黑马最新项目
AIGC项目涵盖大模型私有化部署、聊天机器人、RAG知识库及代码提示工具;天机AI集成SpringAI与多模型工作流;云岚到家聚焦微服务与分布式架构;四方保险构建统一支付与时序数据应用;星辰WMS与Dify项目即将发布。
100 0
黑马最新项目
|
2月前
|
canal 缓存 关系型数据库
微服务原理篇(Canal-Redis)
本文介绍了ES索引同步的常见方案,重点讲解Canal+MQ数据同步机制。通过解析MySQL的binlog日志,Canal模拟slave伪装接入主库,实现增量数据捕获,并结合RabbitMQ保证消息顺序性地同步至Elasticsearch。同时探讨了缓存一致性问题,提出使用分布式锁(如Redis)控制并发写操作,避免双写不一致。还涵盖Redis持久化、集群模式、过期淘汰策略及缓存三剑客(穿透、雪崩、击穿)的解决方案,系统梳理了高并发场景下的数据同步与缓存保障技术体系。
84 0
 微服务原理篇(Canal-Redis)
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
大模型专业名词解释手册
本手册由油炸小波设计提示词、Manus创作,系统梳理大语言模型核心概念,涵盖基础原理、训练技术、优化压缩、推理应用、评估调试及伦理安全六大模块,深入浅出解析LLM关键技术术语。
277 0
|
2月前
|
Arthas 存储 运维
记Arthas实现一次CPU排查与代码热更新
本文介绍使用Arthas排查Java应用CPU占用过高问题的完整流程,涵盖线程分析、阻塞定位、watch命令追踪异常、jad反编译实现热更新及火焰图分析,实现无需重启应用的高效故障排查与代码修复。
85 0
|
2月前
|
消息中间件 监控 NoSQL
海量数据下的订单超时取消
本文深入解析海量订单超时取消的分布式调度方案,对比Redis ZSet、延时消息、时间轮等技术优劣,结合大厂实践,提出分层架构与多级延迟策略,兼顾性能、可靠与可扩展性,助力构建高可用订单系统。
142 2
|
2月前
|
存储 网络协议 Linux
零拷贝
实现文件传输时,若采用传统read/write方式,每32KB需两次系统调用,引发4次上下文切换,1万次共4万次切换,性能低下。主因是频繁的用户态与内核态切换开销大,且数据需经内存缓冲中转。可优化为使用零拷贝技术(如sendfile),减少数据复制和上下文切换,提升传输效率。
|
2月前
|
存储 Java
One Trick Per Day
初始化Map时应避免直接指定容量,建议使用Guava的`Maps.newHashMapWithExpectedSize()`或手动计算初始容量(如:目标大小 / 0.75 + 1),以防扩容开销。禁止使用Executors创建线程池,因其默认队列无界或线程数无限制,易引发OOM。推荐通过ThreadPoolExecutor显式构造,控制资源使用。
|
2月前
|
运维 自然语言处理 监控
阿里云企业支持计划是什么?企业支持计划服务内容及常见问题解答
阿里云对企业有没有相关的支持或扶持计划?当然是有的,阿里云除基础售后支持外,针对业务系统复杂或对服务有更高要求的客户, 可选择阿里云提供的多种企业支持计划,获取工单极速响应、专属技术保障通道、技术服务经理 (TAM)等专属支持。本文为大家介绍阿里云企业支持计划是什么,介绍其服务内容,并解答一些常见问题。
217 2
|
2月前
|
XML Java 数据格式
SpringBoot@Configuration
`@Configuration` 注解用于标记配置类,相当于 XML 配置文件。配合 `@Bean` 可注册 Bean 实例,通过注解方式启动 IOC 容器,实现组件的自动加载与管理。