Watcher机制(一)

简介: 本文深入分析ZooKeeper的Watcher机制核心类及源码实现,涵盖Watcher接口、Event枚举(KeeperState与EventType)、WatchedEvent事件封装、ClientWatchManager客户端管理器及ZKWatchManager的具体实现,揭示事件触发与Watcher通知机制的底层逻辑。

一、前言
  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。
二、总体框图
  对于Watcher机制而言,主要涉及的类主要如下。   

说明:
Watcher 接口类型,其定义了process方法,需子类实现
Event 接口类型,Watcher的内部类,无任何方法
KeeperState 枚举类型,Event的内部类,表示Zookeeper所处的状态
EventType 枚举类型,Event的内部类,表示Zookeeper中发生的事件类型
WatchedEvent 表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType
ClientWatchManager 接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现
ZKWatchManager Zookeeper的内部类,继承ClientWatchManager
MyWatcher ZooKeeperMain的内部类,继承Watcher
ServerCnxn 接口类型,继承Watcher,表示客户端与服务端的一个连接
WatchManager 管理Watcher
三、Watcher源码分析
3.1 内部类
  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下
public interface Watcher {

public interface Event {
    /**
     * Enumeration of states the ZooKeeper may be at the event
     */
    public enum KeeperState {

        @Deprecated
        Unknown (-1),

        Disconnected (0),

        @Deprecated
        NoSyncConnected (1),

        SyncConnected (3),

        AuthFailed (4),

        ConnectedReadOnly (5),

        SaslAuthenticated(6),

        Expired (-112);

        private final int intValue;     

        KeeperState(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static KeeperState fromInt(int intValue) {
            switch(intValue) {
                case   -1: return KeeperState.Unknown;
                case    0: return KeeperState.Disconnected;
                case    1: return KeeperState.NoSyncConnected;
                case    3: return KeeperState.SyncConnected;
                case    4: return KeeperState.AuthFailed;
                case    5: return KeeperState.ConnectedReadOnly;
                case    6: return KeeperState.SaslAuthenticated;
                case -112: return KeeperState.Expired;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to KeeperState");
            }
        }
    }

    /**
     * Enumeration of types of events that may occur on the ZooKeeper
     */
    public enum EventType {
        None (-1),
        NodeCreated (1),
        NodeDeleted (2),
        NodeDataChanged (3),
        NodeChildrenChanged (4);

        private final int intValue;     

        EventType(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static EventType fromInt(int intValue) {
            switch(intValue) {
                case -1: return EventType.None;
                case  1: return EventType.NodeCreated;
                case  2: return EventType.NodeDeleted;
                case  3: return EventType.NodeDataChanged;
                case  4: return EventType.NodeChildrenChanged;

                default:
                    throw new RuntimeException("Invalid integer value for conversion to EventType");
            }
        }           
    }
}

}
说明:可以看到,Event接口并没有定义任何属性和方法,但其包含了KeeperState和EventType两个内部枚举类。
可以简化成:
public interface Event {}
3.2 接口方法  
abstract public void process(WatchedEvent event);
说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。
四、Event源码分析(即3.1内部类)
4.1 内部类

  1. KeeperState 

public enum KeeperState { // 事件发生时Zookeeper的状态
/* Unused, this state is never generated by the server /
@Deprecated
// 未知状态,不再使用,服务器不会产生此状态
Unknown (-1),

/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
// 断开
Disconnected (0),

/** Unused, this state is never generated by the server */
@Deprecated
// 未同步连接,不再使用,服务器不会产生此状态
NoSyncConnected (1),

/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
// 同步连接状态
SyncConnected (3),

/**
* Auth failed state
*/
// 认证失败状态
AuthFailed (4),

/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
// 只读连接状态
ConnectedReadOnly (5),

/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
// SASL认证通过状态
SaslAuthenticated(6),

/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
// 过期状态
Expired (-112);

// 代表状态的整形值
private final int intValue;     // Integer representation of value
// for sending over wire


// 构造函数
KeeperState(int intValue) {
    this.intValue = intValue;
}

// 返回整形值
public int getIntValue() {
    return intValue;
}

// 从整形值构造相应的状态
public static KeeperState fromInt(int intValue) {
    switch(intValue) {
        case   -1: return KeeperState.Unknown;
        case    0: return KeeperState.Disconnected;
        case    1: return KeeperState.NoSyncConnected;
        case    3: return KeeperState.SyncConnected;
        case    4: return KeeperState.AuthFailed;
        case    5: return KeeperState.ConnectedReadOnly;
        case    6: return KeeperState.SaslAuthenticated;
        case -112: return KeeperState.Expired;

        default:
            throw new RuntimeException("Invalid integer value for conversion to KeeperState");
    }
}

}
说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

  1. EventType 
    public enum EventType { // 事件类型
    // 无
    None (-1),
    // 结点创建
    NodeCreated (1),
    // 结点删除
    NodeDeleted (2),
    // 结点数据变化
    NodeDataChanged (3),
    // 结点子节点变化
    NodeChildrenChanged (4);

    // 代表事件类型的整形
    private final int intValue; // Integer representation of value
    // for sending over wire

    // 构造函数
    EventType(int intValue) {

     this.intValue = intValue;
    

    }

    // 返回整形
    public int getIntValue() {

     return intValue;
    

    }

    // 从整形构造相应的事件
    public static EventType fromInt(int intValue) {

     switch(intValue) {
         case -1: return EventType.None;
         case  1: return EventType.NodeCreated;
         case  2: return EventType.NodeDeleted;
         case  3: return EventType.NodeDataChanged;
         case  4: return EventType.NodeChildrenChanged;
    
         default:
             throw new RuntimeException("Invalid integer value for conversion to EventType");
     }
    

    }
    }
    说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。
    五、WatchedEvent
    5.1 类的属性 
    public class WatchedEvent {
    // Zookeeper的状态
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 事件所涉及节点的路径
    private String path;
    }
    说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。
    5.2 构造函数
      1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 
    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
    // 初始化属性
    this.keeperState = keeperState;
    this.eventType = eventType;
    this.path = path;
    }
      说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。
      2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  
    public WatchedEvent(WatcherEvent eventMessage) {
    // 从eventMessage中取出相应属性进行赋值
    keeperState = KeeperState.fromInt(eventMessage.getState());
    eventType = EventType.fromInt(eventMessage.getType());
    path = eventMessage.getPath();
    }
      说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。
    五总结:对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。
    六、ClientWatchManager
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type, String path);
    

      说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。
    七、ZKWatchManager(zookeeper内)
    7.1 类的属性
    private static class ZKWatchManager implements ClientWatchManager {

    // 数据变化的Watchers
    private final Map> dataWatches = new HashMap>();

    // 节点存在与否的Watchers
    private final Map> existWatches = new HashMap>();

    // 子节点变化的Watchers
    private final Map> childWatches = new HashMap>();
    }
     说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。
    7.2 核心方法分析

  2. materialize方法
    public Set materialize(Watcher.Event.KeeperState state,

                             Watcher.Event.EventType type,
                             String clientPath)
    

    {
    // 新生成结果Watcher集合
    Set result = new HashSet();

    switch (type) { // 确定事件类型

     case None: // 无类型
         // 添加默认Watcher
         result.add(defaultWatcher);
         // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、
         // Zookeeper的状态是否为同步连接)
         boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
             state != Watcher.Event.KeeperState.SyncConnected;
         // 同步块
         synchronized(dataWatches) { 
             for(Set<Watcher> ws: dataWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 dataWatches.clear();
             }
         }
    
         // 同步块
         synchronized(existWatches) {  
             for(Set<Watcher> ws: existWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 existWatches.clear();
             }
         }
    
         // 同步块
         synchronized(childWatches) { 
             for(Set<Watcher> ws: childWatches.values()) {
                 // 添加至结果集合
                 result.addAll(ws);
             }
             if (clear) { // 是否需要清空
                 childWatches.clear();
             }
         }
         // 返回结果
         return result;
     case NodeDataChanged: // 节点数据变化
     case NodeCreated: // 创建节点
         synchronized (dataWatches) { // 同步块
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         synchronized (existWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(existWatches.remove(clientPath), result);
         }
         break;
     case NodeChildrenChanged: // 节点子节点变化
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     case NodeDeleted: // 删除节点
         synchronized (dataWatches) { 
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(dataWatches.remove(clientPath), result);
         }
         // XXX This shouldn't be needed, but just in case
         synchronized (existWatches) {
             // 移除clientPath对应的Watcher
             Set<Watcher> list = existWatches.remove(clientPath);
             if (list != null) {
                 // 移除clientPath对应的Watcher后全部添加至结果集合
                 addTo(existWatches.remove(clientPath), result);
                 LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
             }
         }
         synchronized (childWatches) {
             // 移除clientPath对应的Watcher后全部添加至结果集合
             addTo(childWatches.remove(clientPath), result);
         }
         break;
     default: // 缺省处理
         String msg = "Unhandled watch event type " + type
             + " with state " + state + " on path " + clientPath;
         LOG.error(msg);
         throw new RuntimeException(msg);
    

    }

    // 返回结果集合
    return result;
    }
    说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作:
    如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;
    针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
    八、总结
      针对Watcher机制的第一部分的源码分析就已经完成,本章节需重点关注:
    ● 事件的变化,状态的定义依赖于Event内部类的两组枚举值
    ● 上下游调用关系图需记忆一下,为加强记忆,再最后再贴一下

相关文章
|
2月前
|
Java 大数据 Apache
Excel工具-HUTOOL-输出Excel
Hutool基于Apache POI封装了Excel读写功能,提供ExcelWriter和BigExcelWriter类,支持写出List、Map、Bean等数据类型到Excel,可自定义样式、多sheet操作,并解决大数据量导出时的内存溢出问题,适用于文件导出、客户端下载等场景。
|
2月前
|
Apache
Excel工具-HUTOOL-读取Excel
基于Hutool和Apache POI,封装Excel读取工具,支持xls/xlsx格式。可读取为List、Map或Bean,提供Sax模式(Excel03SaxReader/Excel07SaxReader)高效处理大文件,避免内存溢出,适用于海量数据流式读取。
|
2月前
|
Java 数据库 Sentinel
服务保护、分布式事务
本课程深入讲解微服务保护与分布式事务控制。内容涵盖:微服务雪崩问题及熔断、降级、限流、线程隔离等防护机制;基于Sentinel实现熔断降级与流量控制;掌握FallbackFactory与@SentinelResource注解的使用;理解CAP原理与分布式事务场景;通过Seata实现AT模式分布式事务,保障系统高可用与数据一致性。(238字)
|
2月前
|
XML Dubbo Java
搭建dubbo-zk应用
基于Spring Boot 2.2.2与Dubbo 2.0.0,构建ZooKeeper注册中心的分布式服务架构。通过Maven多模块设计,分离API、Provider与Consumer,实现服务暴露与远程调用,结合Lombok简化开发,完成Dubbo服务的注册、发现与HTTP接口验证。
|
2月前
|
Dubbo Java 应用服务中间件
入门运行Soul
Soul 是基于 WebFlux 的高性能响应式 API 网关,支持 Dubbo、Spring Cloud、Spring Boot,具备跨语言、异步、低延迟(1~2ms)特性。采用插件化设计,支持热插拔、动态流量控制、A/B 测试、蓝绿发布,内置鉴权、限流、熔断等丰富插件,数据同步支持 WebSocket、HTTP 长轮询、Zookeeper。
|
2月前
|
消息中间件 算法 网络协议
选举机制源码分析
本文深入解析ZooKeeper中FastLeaderElection选举算法的源码实现,涵盖Election接口、FastLeaderElection核心类及其内部类Notification、ToSend、Messenger的工作机制。重点分析了选票收发、PK规则(epoch、zxid、id)、投票统计与Leader确定流程,全面揭示ZooKeeper领导者选举的底层原理与实现细节。
|
2月前
|
缓存 安全
Watcher机制(二)WatchManager
本文深入分析ZooKeeper中WatchManager类的源码,重点解析其核心数据结构与方法。通过watchTable和watch2Paths两个映射维护路径与Watcher间的双向关系,实现高效的事件监听管理。类中size、addWatch、removeWatcher、triggerWatch等同步方法确保线程安全,支持Watcher的添加、删除与事件触发;dumpWatches则用于调试信息输出。整体设计简洁高效,是ZooKeeper事件通知机制的核心组件。(238字)
|
2月前
|
消息中间件 算法 网络协议
选举机制理解描述
本文深入解析Zookeeper的Leader选举机制,涵盖启动期与运行期的选举流程,详述FastLeaderElection算法核心规则:优先比较ZXID(数据最新性),再比较SID(服务器标识)。通过投票PK、逻辑时钟(electionEpoch)同步与状态变更,确保集群在无主或主节点故障时快速选出新Leader,保障分布式数据一致性。同时介绍QuorumCnxManager网络通信、选票管理及多轮投票统计等实现细节,揭示Zookeeper高可用背后的原理。
|
2月前
|
Kubernetes IDE Java
部署篇(开发部署)
本文介绍如何将SpringCloud应用部署至Kubernetes云端。通过EDAS导入ACK集群并初始化应用,利用IDE插件快速部署JAR/WAR包,提升开发效率。后续将讲解运维视角的自动化发布流程。
Watcher机制(三)之ZooKeeper
本文深入分析ZooKeeper客户端核心类,涵盖其内部类结构、属性、构造函数及关键操作方法。重点解析WatchRegistration体系及create、delete、exists等同步/异步实现机制,揭示ZooKeeper客户端与服务端交互原理。