46-微服务技术栈(高级):分布式协调服务zookeeper源码篇(Watcher机制-1)

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,182元/月
简介:   前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

一、前言

  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

二、总体框图

  对于Watcher机制而言,主要涉及的类主要如下。

  

说明:

Watcher

接口类型,其定义了process方法,需子类实现

Event

接口类型,Watcher的内部类,无任何方法

KeeperState

枚举类型,Event的内部类,表示Zookeeper所处的状态

EventType

枚举类型,Event的内部类,表示Zookeeper中发生的事件类型

WatchedEvent

表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType

ClientWatchManager

接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现

ZKWatchManager

Zookeeper的内部类,继承ClientWatchManager

MyWatcher

ZooKeeperMain的内部类,继承Watcher

ServerCnxn

接口类型,继承Watcher,表示客户端与服务端的一个连接

WatchManager

管理Watcher

三、Watcher源码分析

3.1 内部类

  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下

public interface Watcher {


   public interface Event {

       /**

        * Enumeration of states the ZooKeeper may be at the event

        */

       public enum KeeperState {


           @Deprecated

           Unknown (-1),


           Disconnected (0),


           @Deprecated

           NoSyncConnected (1),


           SyncConnected (3),


           AuthFailed (4),


           ConnectedReadOnly (5),


           SaslAuthenticated(6),


           Expired (-112);


           private final int intValue;    


           KeeperState(int intValue) {

               this.intValue = intValue;

           }


           public int getIntValue() {

               return intValue;

           }


           public static KeeperState fromInt(int intValue) {

               switch(intValue) {

                   case   -1: return KeeperState.Unknown;

                   case    0: return KeeperState.Disconnected;

                   case    1: return KeeperState.NoSyncConnected;

                   case    3: return KeeperState.SyncConnected;

                   case    4: return KeeperState.AuthFailed;

                   case    5: return KeeperState.ConnectedReadOnly;

                   case    6: return KeeperState.SaslAuthenticated;

                   case -112: return KeeperState.Expired;


                   default:

                       throw new RuntimeException("Invalid integer value for conversion to KeeperState");

               }

           }

       }


       /**

        * Enumeration of types of events that may occur on the ZooKeeper

        */

       public enum EventType {

           None (-1),

           NodeCreated (1),

           NodeDeleted (2),

           NodeDataChanged (3),

           NodeChildrenChanged (4);


           private final int intValue;    


           EventType(int intValue) {

               this.intValue = intValue;

           }


           public int getIntValue() {

               return intValue;

           }


           public static EventType fromInt(int intValue) {

               switch(intValue) {

                   case -1: return EventType.None;

                   case  1: return EventType.NodeCreated;

                   case  2: return EventType.NodeDeleted;

                   case  3: return EventType.NodeDataChanged;

                   case  4: return EventType.NodeChildrenChanged;


                   default:

                       throw new RuntimeException("Invalid integer value for conversion to EventType");

               }

           }          

       }

   }

}

说明:可以看到,Event接口并没有定义任何属性和方法,但其包含了KeeperState和EventType两个内部枚举类。

可以简化成:

public interface Event {}

3.2 接口方法  

abstract public void process(WatchedEvent event);

说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。

四、Event源码分析(即3.1内部类)

4.1 内部类

1. KeeperState 

public enum KeeperState { // 事件发生时Zookeeper的状态

   /** Unused, this state is never generated by the server */

   @Deprecated

   // 未知状态,不再使用,服务器不会产生此状态

   Unknown (-1),


   /** The client is in the disconnected state - it is not connected

   * to any server in the ensemble. */

   // 断开

   Disconnected (0),


   /** Unused, this state is never generated by the server */

   @Deprecated

   // 未同步连接,不再使用,服务器不会产生此状态

   NoSyncConnected (1),


   /** The client is in the connected state - it is connected

   * to a server in the ensemble (one of the servers specified

   * in the host connection parameter during ZooKeeper client

   * creation). */

   // 同步连接状态

   SyncConnected (3),


   /**

   * Auth failed state

   */

   // 认证失败状态

   AuthFailed (4),


   /**

   * The client is connected to a read-only server, that is the

   * server which is not currently connected to the majority.

   * The only operations allowed after receiving this state is

   * read operations.

   * This state is generated for read-only clients only since

   * read/write clients aren't allowed to connect to r/o servers.

   */

   // 只读连接状态

   ConnectedReadOnly (5),


   /**

   * SaslAuthenticated: used to notify clients that they are SASL-authenticated,

   * so that they can perform Zookeeper actions with their SASL-authorized permissions.

   */

   // SASL认证通过状态

   SaslAuthenticated(6),


   /** The serving cluster has expired this session. The ZooKeeper

   * client connection (the session) is no longer valid. You must

   * create a new client connection (instantiate a new ZooKeeper

   * instance) if you with to access the ensemble. */

   // 过期状态

   Expired (-112);


   // 代表状态的整形值

   private final int intValue;     // Integer representation of value

   // for sending over wire



   // 构造函数

   KeeperState(int intValue) {

       this.intValue = intValue;

   }


   // 返回整形值

   public int getIntValue() {

       return intValue;

   }


   // 从整形值构造相应的状态

   public static KeeperState fromInt(int intValue) {

       switch(intValue) {

           case   -1: return KeeperState.Unknown;

           case    0: return KeeperState.Disconnected;

           case    1: return KeeperState.NoSyncConnected;

           case    3: return KeeperState.SyncConnected;

           case    4: return KeeperState.AuthFailed;

           case    5: return KeeperState.ConnectedReadOnly;

           case    6: return KeeperState.SaslAuthenticated;

           case -112: return KeeperState.Expired;


           default:

               throw new RuntimeException("Invalid integer value for conversion to KeeperState");

       }

   }

}

说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

2. EventType 

public enum EventType { // 事件类型

   // 无

   None (-1),

   // 结点创建

   NodeCreated (1),

   // 结点删除

   NodeDeleted (2),

   // 结点数据变化

   NodeDataChanged (3),

   // 结点子节点变化

   NodeChildrenChanged (4);


   // 代表事件类型的整形

   private final int intValue;     // Integer representation of value

   // for sending over wire


   // 构造函数

   EventType(int intValue) {

       this.intValue = intValue;

   }


   // 返回整形

   public int getIntValue() {

       return intValue;

   }


   // 从整形构造相应的事件

   public static EventType fromInt(int intValue) {

       switch(intValue) {

           case -1: return EventType.None;

           case  1: return EventType.NodeCreated;

           case  2: return EventType.NodeDeleted;

           case  3: return EventType.NodeDataChanged;

           case  4: return EventType.NodeChildrenChanged;


           default:

               throw new RuntimeException("Invalid integer value for conversion to EventType");

       }

   }          

}

说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。

五、WatchedEvent

5.1 类的属性 

public class WatchedEvent {

   // Zookeeper的状态

   final private KeeperState keeperState;

   // 事件类型

   final private EventType eventType;

   // 事件所涉及节点的路径

   private String path;

}

说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。

5.2 构造函数

  1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 

public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {

   // 初始化属性

   this.keeperState = keeperState;

   this.eventType = eventType;

   this.path = path;

}

  说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。

  2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  

public WatchedEvent(WatcherEvent eventMessage) {

   // 从eventMessage中取出相应属性进行赋值

   keeperState = KeeperState.fromInt(eventMessage.getState());

   eventType = EventType.fromInt(eventMessage.getType());

   path = eventMessage.getPath();

}

  说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。

五总结:对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。

六、ClientWatchManager

public Set<Watcher> materialize(Watcher.Event.KeeperState state,

                               Watcher.Event.EventType type, String path);

  说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

七、ZKWatchManager(zookeeper内)

7.1 类的属性

private static class ZKWatchManager implements ClientWatchManager {

   

   // 数据变化的Watchers

   private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();

   

   // 节点存在与否的Watchers

   private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();

   

   // 子节点变化的Watchers

   private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();

}

 说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

7.2 核心方法分析

1. materialize方法

public Set<Watcher> materialize(Watcher.Event.KeeperState state,

                               Watcher.Event.EventType type,

                               String clientPath)

{

   // 新生成结果Watcher集合

   Set<Watcher> result = new HashSet<Watcher>();


   switch (type) { // 确定事件类型

       case None: // 无类型

           // 添加默认Watcher

           result.add(defaultWatcher);

           // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、

           // Zookeeper的状态是否为同步连接)

           boolean clear = ClientCnxn.getDisableAutoResetWatch() &&

               state != Watcher.Event.KeeperState.SyncConnected;

  // 同步块

           synchronized(dataWatches) {

               for(Set<Watcher> ws: dataWatches.values()) {

                   // 添加至结果集合

                   result.addAll(ws);

               }

               if (clear) { // 是否需要清空

                   dataWatches.clear();

               }

           }

           // 同步块

           synchronized(existWatches) {  

               for(Set<Watcher> ws: existWatches.values()) {

                   // 添加至结果集合

                   result.addAll(ws);

               }

               if (clear) { // 是否需要清空

                   existWatches.clear();

               }

           }

           

  // 同步块

           synchronized(childWatches) {

               for(Set<Watcher> ws: childWatches.values()) {

                   // 添加至结果集合

                   result.addAll(ws);

               }

               if (clear) { // 是否需要清空

                   childWatches.clear();

               }

           }

           // 返回结果

           return result;

       case NodeDataChanged: // 节点数据变化

       case NodeCreated: // 创建节点

           synchronized (dataWatches) { // 同步块

               // 移除clientPath对应的Watcher后全部添加至结果集合

               addTo(dataWatches.remove(clientPath), result);

           }

           synchronized (existWatches) {

               // 移除clientPath对应的Watcher后全部添加至结果集合

               addTo(existWatches.remove(clientPath), result);

           }

           break;

       case NodeChildrenChanged: // 节点子节点变化

           synchronized (childWatches) {

               // 移除clientPath对应的Watcher后全部添加至结果集合

               addTo(childWatches.remove(clientPath), result);

           }

           break;

       case NodeDeleted: // 删除节点

           synchronized (dataWatches) {

               // 移除clientPath对应的Watcher后全部添加至结果集合

               addTo(dataWatches.remove(clientPath), result);

           }

           // XXX This shouldn't be needed, but just in case

           synchronized (existWatches) {

               // 移除clientPath对应的Watcher

               Set<Watcher> list = existWatches.remove(clientPath);

               if (list != null) {

                   // 移除clientPath对应的Watcher后全部添加至结果集合

                   addTo(existWatches.remove(clientPath), result);

                   LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");

               }

           }

           synchronized (childWatches) {

               // 移除clientPath对应的Watcher后全部添加至结果集合

               addTo(childWatches.remove(clientPath), result);

           }

           break;

       default: // 缺省处理

           String msg = "Unhandled watch event type " + type

               + " with state " + state + " on path " + clientPath;

           LOG.error(msg);

           throw new RuntimeException(msg);

   }


   // 返回结果集合

   return result;

}

说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作:

如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;

针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

八、总结

  针对Watcher机制的第一部分的源码分析就已经完成,本章节需重点关注:

  • 事件的变化,状态的定义依赖于Event内部类的两组枚举值
  • 上下游调用关系图需记忆一下,为加强记忆,再最后再贴一下

相关文章
|
5月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
173 5
|
NoSQL 安全 调度
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
Redisson 的 MultiLock 是一种分布式锁实现,支持对多个独立的 RLock 同时加锁或解锁。它通过“整锁整放”机制确保所有锁要么全部加锁成功,要么完全回滚,避免状态不一致。适用于跨多个 Redis 实例或节点的场景,如分布式任务调度。其核心逻辑基于遍历加锁列表,失败时自动释放已获取的锁,保证原子性。解锁时亦逐一操作,降低死锁风险。MultiLock 不依赖 Lua 脚本,而是封装多锁协调,满足高一致性需求的业务场景。
126 0
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
|
6月前
|
安全
【📕分布式锁通关指南 07】源码剖析redisson利用看门狗机制异步维持客户端锁
Redisson 的看门狗机制是解决分布式锁续期问题的核心功能。当通过 `lock()` 方法加锁且未指定租约时间时,默认启用 30 秒的看门狗超时时间。其原理是在获取锁后创建一个定时任务,每隔 1/3 超时时间(默认 10 秒)通过 Lua 脚本检查锁状态并延长过期时间。续期操作异步执行,确保业务线程不被阻塞,同时仅当前持有锁的线程可成功续期。锁释放时自动清理看门狗任务,避免资源浪费。学习源码后需注意:避免使用带超时参数的加锁方法、控制业务执行时间、及时释放锁以优化性能。相比手动循环续期,Redisson 的定时任务方式更高效且安全。
327 24
【📕分布式锁通关指南 07】源码剖析redisson利用看门狗机制异步维持客户端锁
|
6月前
【📕分布式锁通关指南 08】源码剖析redisson可重入锁之释放及阻塞与非阻塞获取
本文深入剖析了Redisson中可重入锁的释放锁Lua脚本实现及其获取锁的两种方式(阻塞与非阻塞)。释放锁流程包括前置检查、重入计数处理、锁删除及消息发布等步骤。非阻塞获取锁(tryLock)通过有限时间等待返回布尔值,适合需快速反馈的场景;阻塞获取锁(lock)则无限等待直至成功,适用于必须获取锁的场景。两者在等待策略、返回值和中断处理上存在显著差异。本文为理解分布式锁实现提供了详实参考。
226 11
【📕分布式锁通关指南 08】源码剖析redisson可重入锁之释放及阻塞与非阻塞获取
|
5月前
|
存储 安全 NoSQL
【📕分布式锁通关指南 09】源码剖析redisson之公平锁的实现
本文深入解析了 Redisson 中公平锁的实现原理。公平锁通过确保线程按请求顺序获取锁,避免“插队”现象。在 Redisson 中,`RedissonFairLock` 类的核心逻辑包含加锁与解锁两部分:加锁时,线程先尝试直接获取锁,失败则将自身信息加入 ZSet 等待队列,只有队首线程才能获取锁;解锁时,验证持有者身份并减少重入计数,最终删除锁或通知等待线程。其“公平性”源于 Lua 脚本的原子性操作:线程按时间戳排队、仅队首可尝试加锁、实时发布锁释放通知。这些设计确保了分布式环境下的线程安全与有序执行。
154 0
【📕分布式锁通关指南 09】源码剖析redisson之公平锁的实现
|
11月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
379 3
|
6月前
|
NoSQL Java Redis
【📕分布式锁通关指南 06】源码剖析redisson可重入锁之加锁
本文详细解析了Redisson可重入锁的加锁流程。首先从`RLock.lock()`方法入手,通过获取当前线程ID并调用`tryAcquire`尝试加锁。若加锁失败,则订阅锁释放通知并循环重试。核心逻辑由Lua脚本实现:检查锁是否存在,若不存在则创建并设置重入次数为1;若存在且为当前线程持有,则重入次数+1。否则返回锁的剩余过期时间。此过程展示了Redisson高效、可靠的分布式锁机制。
203 0
【📕分布式锁通关指南 06】源码剖析redisson可重入锁之加锁
|
8月前
|
人工智能 安全 Java
微服务引擎 MSE:打造通用的企业级微服务架构
微服务引擎MSE致力于打造通用的企业级微服务架构,涵盖四大核心内容:微服务技术趋势与挑战、MSE应对方案、拥抱开源及最佳实践。MSE通过流量入口、内部流量管理、服务治理等模块,提供高可用、跨语言支持和性能优化。此外,MSE坚持开放,推动云原生与AI融合,助力企业实现无缝迁移和高效运维。
287 1
|
11月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
156 3
|
11月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
155 1

相关产品

  • 微服务引擎