4.Watcher机制(一)

简介: 本文深入分析ZooKeeper的Watcher机制核心类及源码实现,涵盖Watcher接口、Event枚举(KeeperState与EventType)、WatchedEvent事件封装、ClientWatchManager与ZKWatchManager的Watcher管理逻辑,重点解析事件触发时的监听器回调与移除机制,帮助理解ZooKeeper分布式协调中的状态通知原理。

一、前言
  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。
二、总体框图
  对于Watcher机制而言,主要涉及的类主要如下。   

说明:
Watcher 接口类型,其定义了process方法,需子类实现
Event 接口类型,Watcher的内部类,无任何方法
KeeperState 枚举类型,Event的内部类,表示Zookeeper所处的状态
EventType 枚举类型,Event的内部类,表示Zookeeper中发生的事件类型
WatchedEvent 表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType
ClientWatchManager 接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现
ZKWatchManager Zookeeper的内部类,继承ClientWatchManager
MyWatcher ZooKeeperMain的内部类,继承Watcher
ServerCnxn 接口类型,继承Watcher,表示客户端与服务端的一个连接
WatchManager 管理Watcher
三、Watcher源码分析
3.1 内部类
  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下
public interface Watcher {

public interface Event {
    /**
     * Enumeration of states the ZooKeeper may be at the event
     */
    public enum KeeperState {

        @Deprecated
        Unknown (-1),

        Disconnected (0),

        @Deprecated
        NoSyncConnected (1),

        SyncConnected (3),

        AuthFailed (4),

        ConnectedReadOnly (5),

        SaslAuthenticated(6),

        Expired (-112);

        private final int intValue;     

        KeeperState(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static KeeperState fromInt(int intValue) {
            switch(intValue) {
                case   -1: return KeeperState.Unknown;
                case    0: return KeeperState.Disconnected;
                case    1: return KeeperState.NoSyncConnected;
                case    3: return KeeperState.SyncConnected;
                case    4: return KeeperState.AuthFailed;
                case    5: return KeeperState.ConnectedReadOnly;
                case    6: return KeeperState.SaslAuthenticated;
                case -112: return KeeperState.Expired;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to KeeperState");
            }
        }
    }

    /**
     * Enumeration of types of events that may occur on the ZooKeeper
     */
    public enum EventType {
        None (-1),
        NodeCreated (1),
        NodeDeleted (2),
        NodeDataChanged (3),
        NodeChildrenChanged (4);

        private final int intValue;     

        EventType(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static EventType fromInt(int intValue) {
            switch(intValue) {
                case -1: return EventType.None;
                case  1: return EventType.NodeCreated;
                case  2: return EventType.NodeDeleted;
                case  3: return EventType.NodeDataChanged;
                case  4: return EventType.NodeChildrenChanged;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to EventType");
            }
        }           
    }
}

}
说明:可以看到,Event接口并没有定义任何属性和方法,但其包含了KeeperState和EventType两个内部枚举类。
可以简化成:
public interface Event {}
3.2 接口方法  
abstract public void process(WatchedEvent event);
说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。
四、Event源码分析(即3.1内部类)
4.1 内部类

  1. KeeperState 

public enum KeeperState { // 事件发生时Zookeeper的状态
/* Unused, this state is never generated by the server /
@Deprecated
// 未知状态,不再使用,服务器不会产生此状态
Unknown (-1),

/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
// 断开
Disconnected (0),

/** Unused, this state is never generated by the server */
@Deprecated
// 未同步连接,不再使用,服务器不会产生此状态
NoSyncConnected (1),

/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
// 同步连接状态
SyncConnected (3),

/**
* Auth failed state
*/
// 认证失败状态
AuthFailed (4),

/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
// 只读连接状态
ConnectedReadOnly (5),

/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
// SASL认证通过状态
SaslAuthenticated(6),

/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
// 过期状态
Expired (-112);

// 代表状态的整形值
private final int intValue;     // Integer representation of value
// for sending over wire


// 构造函数
KeeperState(int intValue) {
    this.intValue = intValue;
}

// 返回整形值
public int getIntValue() {
    return intValue;
}

// 从整形值构造相应的状态
public static KeeperState fromInt(int intValue) {
    switch(intValue) {
        case   -1: return KeeperState.Unknown;
        case    0: return KeeperState.Disconnected;
        case    1: return KeeperState.NoSyncConnected;
        case    3: return KeeperState.SyncConnected;
        case    4: return KeeperState.AuthFailed;
        case    5: return KeeperState.ConnectedReadOnly;
        case    6: return KeeperState.SaslAuthenticated;
        case -112: return KeeperState.Expired;

        default:
            throw new RuntimeException("Invalid integer value for conversion to KeeperState");
    }
}

}
说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

  1. EventType 
    public enum EventType { // 事件类型
    // 无
    None (-1),
    // 结点创建
    NodeCreated (1),
    // 结点删除
    NodeDeleted (2),
    // 结点数据变化
    NodeDataChanged (3),
    // 结点子节点变化
    NodeChildrenChanged (4);

    // 代表事件类型的整形
    private final int intValue; // Integer representation of value
    // for sending over wire

    // 构造函数
    EventType(int intValue) {

     this.intValue = intValue;
    

    }

    // 返回整形
    public int getIntValue() {

     return intValue;
    

    }

    // 从整形构造相应的事件
    public static EventType fromInt(int intValue) {

     switch(intValue) {
         case -1: return EventType.None;
         case  1: return EventType.NodeCreated;
         case  2: return EventType.NodeDeleted;
         case  3: return EventType.NodeDataChanged;
         case  4: return EventType.NodeChildrenChanged;
    
         default:
             throw new RuntimeException("Invalid integer value for conversion to EventType");
     }
    

    }
    }
    说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。
    五、WatchedEvent
    5.1 类的属性 
    public class WatchedEvent {
    // Zookeeper的状态
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 事件所涉及节点的路径
    private String path;
    }
    说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。
    5.2 构造函数
      1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 
    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
    // 初始化属性
    this.keeperState = keeperState;
    this.eventType = eventType;
    this.path = path;
    }
      说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。
      2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  
    public WatchedEvent(WatcherEvent eventMessage) {
    // 从eventMessage中取出相应属性进行赋值
    keeperState = KeeperState.fromInt(eventMessage.getState());
    eventType = EventType.fromInt(eventMessage.getType());
    path = eventMessage.getPath();
    }
      说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。
    五总结:对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。
    六、ClientWatchManager
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type, String path);
    

      说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。
    七、ZKWatchManager(zookeeper内)
    7.1 类的属性
    private static class ZKWatchManager implements ClientWatchManager {

    // 数据变化的Watchers
    private final Map> dataWatches = new HashMap>();

    // 节点存在与否的Watchers
    private final Map> existWatches = new HashMap>();

    // 子节点变化的Watchers
    private final Map> childWatches = new HashMap>();
    }
     说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。
    7.2 核心方法分析

  2. materialize方法
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type,
                             String clientPath)
    

    {
    // 新生成结果Watcher集合
    Set result = new HashSet();

    switch (type) { // 确定事件类型

     case None: // 无类型
         // 添加默认Watcher
         result.add(defaultWatcher);
         // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、
         // Zookeeper的状态是否为同步连接)
         boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
             state != Watcher.Event.KeeperState.SyncConnected;
         // 同步块
         synchronized(dataWatches) { 
             for(Set<Watcher> ws: dataWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 dataWatches.clear();
             }
         }
    
         // 同步块
         synchronized(existWatches) {  
             for(Set<Watcher> ws: existWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 existWatches.clear();
             }
         }
    
         // 同步块
         synchronized(childWatches) { 
             for(Set<Watcher> ws: childWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 childWatches.clear();
             }
         }
         // 返回结果
         return result;
     case NodeDataChanged: // 节点数据变化
     case NodeCreated: // 创建节点
         synchronized (dataWatches) { // 同步块
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         synchronized (existWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(existWatches.remove(clientPath), result);
         }
         break;
     case NodeChildrenChanged: // 节点子节点变化
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     case NodeDeleted: // 删除节点
         synchronized (dataWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         // XXX This shouldn't be needed, but just in case
         synchronized (existWatches) {
             // 移除clientPath对应的Watcher
             Set<Watcher> list = existWatches.remove(clientPath);
             if (list != null) {
                 // 移除clientPath对应的Watcher后全部添加至结果集合
                 addTo(existWatches.remove(clientPath), result);
                 LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
             }
         }
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     default: // 缺省处理
         String msg = "Unhandled watch event type " + type
             + " with state " + state + " on path " + clientPath;
         LOG.error(msg);
         throw new RuntimeException(msg);
    

    }

    // 返回结果集合
    return result;
    }
    说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作:
    如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;
    针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
    八、总结
      针对Watcher机制的第一部分的源码分析就已经完成,本章节需重点关注:
    ● 事件的变化,状态的定义依赖于Event内部类的两组枚举值
    ● 上下游调用关系图需记忆一下,为加强记忆,再最后再贴一下

相关文章
|
6天前
|
缓存 人工智能 自然语言处理
我对比了8个Claude API中转站,踩了不少坑,总结给你
本文是个人开发者耗时1周实测的8大Claude中转平台横向评测,聚焦Claude Code真实体验:以加权均价(¥/M token)、内部汇率、缓存支持、模型真实性及稳定性为核心指标。
2557 18
|
18天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
16044 48
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
24天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
34944 57
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
13天前
|
人工智能 JavaScript Ubuntu
低成本搭建AIP自动化写作系统:Hermes保姆级使用教程,长文和逐步实操贴图
我带着怀疑的态度,深度使用了几天,聚焦微信公众号AIP自动化写作场景,写出来的几篇文章,几乎没有什么修改,至少合乎我本人的意愿,而且排版风格,也越来越完善,同样是起码过得了我自己这一关。 这个其实OpenClaw早可以实现了,但是目前我觉得最大的区别是,Hermes会自主总结提炼,并更新你的写作技能。 相信就冲这一点,就值得一试。 这篇帖子主要就Hermes部署使用,作一个非常详细的介绍,几乎一步一贴图。 关于Hermes,无论你赞成哪种声音,我希望都是你自己动手行动过,发自内心的选择!
3057 29
|
3天前
|
云安全 人工智能 安全
|
3天前
|
人工智能 测试技术 API
阿里Qwen3.6-27B正式开源:网友直呼“太牛了”!
阿里云千问3.6系列重磅开源Qwen3.6-27B稠密大模型!官网:https://t.aliyun.com/U/JbblVp 仅270亿参数,编程能力媲美千亿模型,在SWE-bench等权威基准中表现卓越。支持多模态理解、本地部署及OpenClaw等智能体集成,已开放Hugging Face与ModelScope下载。
|
2天前
|
机器学习/深度学习 缓存 测试技术
DeepSeek-V4开源:百万上下文,Agent能力比肩顶级闭源模型
DeepSeek-V4正式开源!含V4-Pro(1.6T参数)与V4-Flash(284B参数)双版本,均支持百万token上下文。首创混合注意力架构,Agent能力、世界知识与推理性能全面领先开源模型,数学/代码评测比肩顶级闭源模型。
1372 6

热门文章

最新文章