Zookeeper4.Watcher机制(一)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 本文深入分析ZooKeeper的Watcher机制核心类与源码实现,涵盖Watcher接口、Event枚举(KeeperState、EventType)、WatchedEvent事件封装、ClientWatchManager及ZKWatchManager的管理逻辑,重点解析事件触发与Watcher通知机制,帮助理解ZooKeeper分布式协调中的状态监听与回调原理。

一、前言
  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。
二、总体框图
  对于Watcher机制而言,主要涉及的类主要如下。   

说明:
Watcher 接口类型,其定义了process方法,需子类实现
Event 接口类型,Watcher的内部类,无任何方法
KeeperState 枚举类型,Event的内部类,表示Zookeeper所处的状态
EventType 枚举类型,Event的内部类,表示Zookeeper中发生的事件类型
WatchedEvent 表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType
ClientWatchManager 接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现
ZKWatchManager Zookeeper的内部类,继承ClientWatchManager
MyWatcher ZooKeeperMain的内部类,继承Watcher
ServerCnxn 接口类型,继承Watcher,表示客户端与服务端的一个连接
WatchManager 管理Watcher
三、Watcher源码分析
3.1 内部类
  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下
public interface Watcher {

public interface Event {
    /**
     * Enumeration of states the ZooKeeper may be at the event
     */
    public enum KeeperState {

        @Deprecated
        Unknown (-1),

        Disconnected (0),

        @Deprecated
        NoSyncConnected (1),

        SyncConnected (3),

        AuthFailed (4),

        ConnectedReadOnly (5),

        SaslAuthenticated(6),

        Expired (-112);

        private final int intValue;     

        KeeperState(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static KeeperState fromInt(int intValue) {
            switch(intValue) {
                case   -1: return KeeperState.Unknown;
                case    0: return KeeperState.Disconnected;
                case    1: return KeeperState.NoSyncConnected;
                case    3: return KeeperState.SyncConnected;
                case    4: return KeeperState.AuthFailed;
                case    5: return KeeperState.ConnectedReadOnly;
                case    6: return KeeperState.SaslAuthenticated;
                case -112: return KeeperState.Expired;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to KeeperState");
            }
        }
    }

    /**
     * Enumeration of types of events that may occur on the ZooKeeper
     */
    public enum EventType {
        None (-1),
        NodeCreated (1),
        NodeDeleted (2),
        NodeDataChanged (3),
        NodeChildrenChanged (4);

        private final int intValue;     

        EventType(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static EventType fromInt(int intValue) {
            switch(intValue) {
                case -1: return EventType.None;
                case  1: return EventType.NodeCreated;
                case  2: return EventType.NodeDeleted;
                case  3: return EventType.NodeDataChanged;
                case  4: return EventType.NodeChildrenChanged;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to EventType");
            }
        }           
    }
}

}
说明:可以看到,Event接口并没有定义任何属性和方法,但其包含了KeeperState和EventType两个内部枚举类。
可以简化成:
public interface Event {}
3.2 接口方法  
abstract public void process(WatchedEvent event);
说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。
四、Event源码分析(即3.1内部类)
4.1 内部类

  1. KeeperState 

public enum KeeperState { // 事件发生时Zookeeper的状态
/* Unused, this state is never generated by the server /
@Deprecated
// 未知状态,不再使用,服务器不会产生此状态
Unknown (-1),

/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
// 断开
Disconnected (0),

/** Unused, this state is never generated by the server */
@Deprecated
// 未同步连接,不再使用,服务器不会产生此状态
NoSyncConnected (1),

/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
// 同步连接状态
SyncConnected (3),

/**
* Auth failed state
*/
// 认证失败状态
AuthFailed (4),

/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
// 只读连接状态
ConnectedReadOnly (5),

/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
// SASL认证通过状态
SaslAuthenticated(6),

/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
// 过期状态
Expired (-112);

// 代表状态的整形值
private final int intValue;     // Integer representation of value
// for sending over wire


// 构造函数
KeeperState(int intValue) {
    this.intValue = intValue;
}

// 返回整形值
public int getIntValue() {
    return intValue;
}

// 从整形值构造相应的状态
public static KeeperState fromInt(int intValue) {
    switch(intValue) {
        case   -1: return KeeperState.Unknown;
        case    0: return KeeperState.Disconnected;
        case    1: return KeeperState.NoSyncConnected;
        case    3: return KeeperState.SyncConnected;
        case    4: return KeeperState.AuthFailed;
        case    5: return KeeperState.ConnectedReadOnly;
        case    6: return KeeperState.SaslAuthenticated;
        case -112: return KeeperState.Expired;

        default:
            throw new RuntimeException("Invalid integer value for conversion to KeeperState");
    }
}

}
说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

  1. EventType 
    public enum EventType { // 事件类型
    // 无
    None (-1),
    // 结点创建
    NodeCreated (1),
    // 结点删除
    NodeDeleted (2),
    // 结点数据变化
    NodeDataChanged (3),
    // 结点子节点变化
    NodeChildrenChanged (4);

    // 代表事件类型的整形
    private final int intValue; // Integer representation of value
    // for sending over wire

    // 构造函数
    EventType(int intValue) {

     this.intValue = intValue;
    

    }

    // 返回整形
    public int getIntValue() {

     return intValue;
    

    }

    // 从整形构造相应的事件
    public static EventType fromInt(int intValue) {

     switch(intValue) {
         case -1: return EventType.None;
         case  1: return EventType.NodeCreated;
         case  2: return EventType.NodeDeleted;
         case  3: return EventType.NodeDataChanged;
         case  4: return EventType.NodeChildrenChanged;
    
         default:
             throw new RuntimeException("Invalid integer value for conversion to EventType");
     }
    

    }
    }
    说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。
    五、WatchedEvent
    5.1 类的属性 
    public class WatchedEvent {
    // Zookeeper的状态
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 事件所涉及节点的路径
    private String path;
    }
    说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。
    5.2 构造函数
      1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 
    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
    // 初始化属性
    this.keeperState = keeperState;
    this.eventType = eventType;
    this.path = path;
    }
      说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。
      2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  
    public WatchedEvent(WatcherEvent eventMessage) {
    // 从eventMessage中取出相应属性进行赋值
    keeperState = KeeperState.fromInt(eventMessage.getState());
    eventType = EventType.fromInt(eventMessage.getType());
    path = eventMessage.getPath();
    }
      说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。
    五总结:对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。
    六、ClientWatchManager
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type, String path);
    

      说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。
    七、ZKWatchManager(zookeeper内)
    7.1 类的属性
    private static class ZKWatchManager implements ClientWatchManager {

    // 数据变化的Watchers
    private final Map> dataWatches = new HashMap>();

    // 节点存在与否的Watchers
    private final Map> existWatches = new HashMap>();

    // 子节点变化的Watchers
    private final Map> childWatches = new HashMap>();
    }
     说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。
    7.2 核心方法分析

  2. materialize方法
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type,
                             String clientPath)
    

    {
    // 新生成结果Watcher集合
    Set result = new HashSet();

    switch (type) { // 确定事件类型

     case None: // 无类型
         // 添加默认Watcher
         result.add(defaultWatcher);
         // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、
         // Zookeeper的状态是否为同步连接)
         boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
             state != Watcher.Event.KeeperState.SyncConnected;
         // 同步块
         synchronized(dataWatches) { 
             for(Set<Watcher> ws: dataWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 dataWatches.clear();
             }
         }
    
         // 同步块
         synchronized(existWatches) {  
             for(Set<Watcher> ws: existWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 existWatches.clear();
             }
         }
    
         // 同步块
         synchronized(childWatches) { 
             for(Set<Watcher> ws: childWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 childWatches.clear();
             }
         }
         // 返回结果
         return result;
     case NodeDataChanged: // 节点数据变化
     case NodeCreated: // 创建节点
         synchronized (dataWatches) { // 同步块
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         synchronized (existWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(existWatches.remove(clientPath), result);
         }
         break;
     case NodeChildrenChanged: // 节点子节点变化
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     case NodeDeleted: // 删除节点
         synchronized (dataWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         // XXX This shouldn't be needed, but just in case
         synchronized (existWatches) {
             // 移除clientPath对应的Watcher
             Set<Watcher> list = existWatches.remove(clientPath);
             if (list != null) {
                 // 移除clientPath对应的Watcher后全部添加至结果集合
                 addTo(existWatches.remove(clientPath), result);
                 LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
             }
         }
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     default: // 缺省处理
         String msg = "Unhandled watch event type " + type
             + " with state " + state + " on path " + clientPath;
         LOG.error(msg);
         throw new RuntimeException(msg);
    

    }

    // 返回结果集合
    return result;
    }
    说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作:
    如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;
    针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
    八、总结
      针对Watcher机制的第一部分的源码分析就已经完成,本章节需重点关注:
    ● 事件的变化,状态的定义依赖于Event内部类的两组枚举值
    ● 上下游调用关系图需记忆一下,为加强记忆,再最后再贴一下

相关文章
|
2月前
|
存储 关系型数据库 调度
微服务原理篇(XXLJOB-幂等-MySQL)
本课程深入讲解微服务核心组件XXL-JOB任务调度原理,涵盖其架构、分布式任务处理、幂等性设计及MySQL存储引擎、索引机制、SQL优化与分库分表策略,全面提升系统性能与可靠性。
|
3月前
|
人工智能 自然语言处理 安全
2025年工业AI系统公司深度评测:领先企业如何以技术创新驱动制造业智能升级
树根科技工业AI系统,依托“1+2+N”架构,融合生成式与非生成式AI,构建覆盖企业经营与生产制造全链路的智能闭环。凭借根灵大模型、高精度视觉检测与能耗优化等技术突破,实现在装备制造、汽车、船舶等多行业落地,助力企业降本增效。评测显示其系统协同性强、实证效果显著,为制造业智能化升级提供可复制的系统化解决方案。
264 0
|
18天前
|
人工智能 开发框架 JSON
【RuoYi-SpringBoot3-Pro】:AI 能力再扩展,一个方法打通 n8n 工作流
RuoYi-SpringBoot3-Pro 集成 n8n,通过一个 Webhook 方法实现 AI 能力扩展。Java 端轻量触发,复杂 AI 工作流由 n8n 可视化编排,支持文本处理、文件上传等场景,灵活高效,助力企业级应用快速集成自动化能力。
140 5
|
5月前
|
机器学习/深度学习 算法 Java
基于灰狼优化算法(GWO)解决柔性作业车间调度问题(Matlab代码实现)
基于灰狼优化算法(GWO)解决柔性作业车间调度问题(Matlab代码实现)
340 1
|
5月前
|
安全 Linux iOS开发
Tenable Nessus 10.10 (macOS, Linux, Windows) - 漏洞评估解决方案
Tenable Nessus 10.10 (macOS, Linux, Windows) - 漏洞评估解决方案
390 0
Tenable Nessus 10.10 (macOS, Linux, Windows) - 漏洞评估解决方案
|
12月前
|
机器学习/深度学习 存储 传感器
《解锁深度Q网络新姿势:非马尔可夫环境难题》
深度Q网络(DQN)结合深度学习与Q学习,在Atari游戏等领域取得显著成绩,但在非马尔可夫环境中面临挑战。传统DQN基于马尔可夫决策过程(MDP),假设未来状态仅依赖当前状态和动作,忽视历史信息,导致在复杂环境中表现不佳。为此,研究人员提出了三种改进策略:1) 记忆增强型DQN,引入LSTM等记忆模块;2) 基于模型的强化学习结合,通过预测环境动态提升决策准确性;3) 多智能体协作与信息共享,利用多个智能体共同感知和决策。实验表明,这些改进有效提升了DQN在非马尔可夫环境中的性能,但计算复杂度和模型可解释性仍是未来研究的重点。
284 17
|
监控 安全 网络性能优化
|
Python
Flask学习笔记(二):基于Flask框架上传图片到服务器端并原名保存
关于如何使用Flask框架上传图片到服务器端并以其原名保存的教程。
607 1
|
开发框架 数据安全/隐私保护 Android开发
iOS二维码的生成和扫码详细介绍(手把手教)
iOS二维码的生成和扫码详细介绍(手把手教)
1074 0
|
弹性计算 安全 网络安全
阿里云ECS经典网络和专有网络有什么区别?
阿里云面向客户提供的网络类型服务有经典网络和专有网络两种,但这两者有什么区别呢?阿里官网给的解释是: 经典网络:IP地址由阿里云统一分配,配置简便,使用方便,适合对操作易用性要求比较高、需要快速使用 ECS 的用户。