Watcher机制(一)

简介: 本文深入分析Zookeeper的Watcher机制,涵盖核心类与源码实现。重点解析Watcher、Event、WatchedEvent等接口与类,梳理其内部结构及事件通知流程,帮助理解Zookeeper的数据变更监听原理。

一、前言

  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

二、总体框图

  对于Watcher机制而言,主要涉及的类主要如下。

  


说明:

Watcher

接口类型,其定义了process方法,需子类实现

Event

接口类型,Watcher的内部类,无任何方法

KeeperState

枚举类型,Event的内部类,表示Zookeeper所处的状态

EventType

枚举类型,Event的内部类,表示Zookeeper中发生的事件类型

WatchedEvent

表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType

ClientWatchManager

接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现

ZKWatchManager

Zookeeper的内部类,继承ClientWatchManager

MyWatcher

ZooKeeperMain的内部类,继承Watcher

ServerCnxn

接口类型,继承Watcher,表示客户端与服务端的一个连接

WatchManager

管理Watcher

三、Watcher源码分析

3.1 内部类

  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下

public interface Watcher {
    public interface Event {
        /**
         * Enumeration of states the ZooKeeper may be at the event
         */
        public enum KeeperState {
            @Deprecated
            Unknown (-1),
            Disconnected (0),
            @Deprecated
            NoSyncConnected (1),
            SyncConnected (3),
            AuthFailed (4),
            ConnectedReadOnly (5),
            SaslAuthenticated(6),
            Expired (-112);
            private final int intValue;     
            KeeperState(int intValue) {
                this.intValue = intValue;
            }
            public int getIntValue() {
                return intValue;
            }
            public static KeeperState fromInt(int intValue) {
                switch(intValue) {
                    case   -1: return KeeperState.Unknown;
                    case    0: return KeeperState.Disconnected;
                    case    1: return KeeperState.NoSyncConnected;
                    case    3: return KeeperState.SyncConnected;
                    case    4: return KeeperState.AuthFailed;
                    case    5: return KeeperState.ConnectedReadOnly;
                    case    6: return KeeperState.SaslAuthenticated;
                    case -112: return KeeperState.Expired;
                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }
        /**
         * Enumeration of types of events that may occur on the ZooKeeper
         */
        public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);
            private final int intValue;     
            EventType(int intValue) {
                this.intValue = intValue;
            }
            public int getIntValue() {
                return intValue;
            }
            public static EventType fromInt(int intValue) {
                switch(intValue) {
                    case -1: return EventType.None;
                    case  1: return EventType.NodeCreated;
                    case  2: return EventType.NodeDeleted;
                    case  3: return EventType.NodeDataChanged;
                    case  4: return EventType.NodeChildrenChanged;
                    default:
                        throw new RuntimeException("Invalid integer value for conversion to EventType");
                }
            }           
        }
    }
}

说明:可以看到,Event接口并没有定义任何属性和方法,但其包含了KeeperState和EventType两个内部枚举类。

可以简化成:

public interface Event {}

3.2 接口方法  

abstract public void process(WatchedEvent event);

说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。

四、Event源码分析(即3.1内部类)

4.1 内部类

1. KeeperState 

public enum KeeperState { // 事件发生时Zookeeper的状态
    /** Unused, this state is never generated by the server */
    @Deprecated
    // 未知状态,不再使用,服务器不会产生此状态
    Unknown (-1), 
    /** The client is in the disconnected state - it is not connected
    * to any server in the ensemble. */
    // 断开
    Disconnected (0),
    /** Unused, this state is never generated by the server */
    @Deprecated
    // 未同步连接,不再使用,服务器不会产生此状态
    NoSyncConnected (1),
    /** The client is in the connected state - it is connected
    * to a server in the ensemble (one of the servers specified
    * in the host connection parameter during ZooKeeper client
    * creation). */
    // 同步连接状态
    SyncConnected (3),
    /**
    * Auth failed state
    */
    // 认证失败状态
    AuthFailed (4),
    /**
    * The client is connected to a read-only server, that is the
    * server which is not currently connected to the majority.
    * The only operations allowed after receiving this state is
    * read operations.
    * This state is generated for read-only clients only since
    * read/write clients aren't allowed to connect to r/o servers.
    */
    // 只读连接状态
    ConnectedReadOnly (5),
    /**
    * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
    * so that they can perform Zookeeper actions with their SASL-authorized permissions.
    */
    // SASL认证通过状态
    SaslAuthenticated(6),
    /** The serving cluster has expired this session. The ZooKeeper
    * client connection (the session) is no longer valid. You must
    * create a new client connection (instantiate a new ZooKeeper
    * instance) if you with to access the ensemble. */
    // 过期状态
    Expired (-112);
    // 代表状态的整形值
    private final int intValue;     // Integer representation of value
    // for sending over wire
    // 构造函数
    KeeperState(int intValue) {
        this.intValue = intValue;
    }
    // 返回整形值
    public int getIntValue() {
        return intValue;
    }
    // 从整形值构造相应的状态
    public static KeeperState fromInt(int intValue) {
        switch(intValue) {
            case   -1: return KeeperState.Unknown;
            case    0: return KeeperState.Disconnected;
            case    1: return KeeperState.NoSyncConnected;
            case    3: return KeeperState.SyncConnected;
            case    4: return KeeperState.AuthFailed;
            case    5: return KeeperState.ConnectedReadOnly;
            case    6: return KeeperState.SaslAuthenticated;
            case -112: return KeeperState.Expired;
            default:
                throw new RuntimeException("Invalid integer value for conversion to KeeperState");
        }
    }
}

说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

2. EventType 

public enum EventType { // 事件类型
    // 无
    None (-1),
    // 结点创建
    NodeCreated (1),
    // 结点删除
    NodeDeleted (2),
    // 结点数据变化
    NodeDataChanged (3),
    // 结点子节点变化
    NodeChildrenChanged (4);
    // 代表事件类型的整形 
    private final int intValue;     // Integer representation of value
    // for sending over wire
    // 构造函数
    EventType(int intValue) {
        this.intValue = intValue;
    }
    // 返回整形
    public int getIntValue() {
        return intValue;
    }
    // 从整形构造相应的事件
    public static EventType fromInt(int intValue) {
        switch(intValue) {
            case -1: return EventType.None;
            case  1: return EventType.NodeCreated;
            case  2: return EventType.NodeDeleted;
            case  3: return EventType.NodeDataChanged;
            case  4: return EventType.NodeChildrenChanged;
            default:
                throw new RuntimeException("Invalid integer value for conversion to EventType");
        }
    }           
}

说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。

五、WatchedEvent

5.1 类的属性 

public class WatchedEvent {
    // Zookeeper的状态
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 事件所涉及节点的路径
    private String path;
}

说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。

5.2 构造函数

  1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 

public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
    // 初始化属性
    this.keeperState = keeperState;
    this.eventType = eventType;
    this.path = path;
}

  说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。

  2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  

public WatchedEvent(WatcherEvent eventMessage) {
    // 从eventMessage中取出相应属性进行赋值
    keeperState = KeeperState.fromInt(eventMessage.getState());
    eventType = EventType.fromInt(eventMessage.getType());
    path = eventMessage.getPath();
}

  说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。

五总结:对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。

六、ClientWatchManager

public Set<Watcher> materialize(Watcher.Event.KeeperState state, 
                                Watcher.Event.EventType type, String path);

  说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

七、ZKWatchManager(zookeeper内)

7.1 类的属性

private static class ZKWatchManager implements ClientWatchManager {
    
    // 数据变化的Watchers
    private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
    
    // 节点存在与否的Watchers
    private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
    
    // 子节点变化的Watchers
    private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
}

 说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

7.2 核心方法分析

1. materialize方法

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                Watcher.Event.EventType type,
                                String clientPath)
{
    // 新生成结果Watcher集合
    Set<Watcher> result = new HashSet<Watcher>();
    switch (type) { // 确定事件类型
        case None: // 无类型
            // 添加默认Watcher
            result.add(defaultWatcher);
            // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、
            // Zookeeper的状态是否为同步连接)
            boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                state != Watcher.Event.KeeperState.SyncConnected;
      // 同步块
            synchronized(dataWatches) { 
                for(Set<Watcher> ws: dataWatches.values()) {
                    // 添加至结果集合
                    result.addAll(ws);
                }
                if (clear) { // 是否需要清空
                    dataWatches.clear();
                }
            }
      
            // 同步块
            synchronized(existWatches) {  
                for(Set<Watcher> ws: existWatches.values()) {
                    // 添加至结果集合
                    result.addAll(ws);
                }
                if (clear) { // 是否需要清空
                    existWatches.clear();
                }
            }
            
      // 同步块
            synchronized(childWatches) { 
                for(Set<Watcher> ws: childWatches.values()) {
                    // 添加至结果集合
                    result.addAll(ws);
                }
                if (clear) { // 是否需要清空
                    childWatches.clear();
                }
            }
            // 返回结果
            return result;
        case NodeDataChanged: // 节点数据变化
        case NodeCreated: // 创建节点
            synchronized (dataWatches) { // 同步块
                // 移除clientPath对应的Watcher后全部添加至结果集合
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) { 
                // 移除clientPath对应的Watcher后全部添加至结果集合
                addTo(existWatches.remove(clientPath), result);
            }
            break;
        case NodeChildrenChanged: // 节点子节点变化
            synchronized (childWatches) {
                // 移除clientPath对应的Watcher后全部添加至结果集合
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        case NodeDeleted: // 删除节点
            synchronized (dataWatches) { 
                // 移除clientPath对应的Watcher后全部添加至结果集合
                addTo(dataWatches.remove(clientPath), result);
            }
            // XXX This shouldn't be needed, but just in case
            synchronized (existWatches) {
                // 移除clientPath对应的Watcher
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(existWatches.remove(clientPath), result);
                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                // 移除clientPath对应的Watcher后全部添加至结果集合
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        default: // 缺省处理
            String msg = "Unhandled watch event type " + type
                + " with state " + state + " on path " + clientPath;
            LOG.error(msg);
            throw new RuntimeException(msg);
    }
    // 返回结果集合
    return result;
}

说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作:

如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;

针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

八、总结

  针对Watcher机制的第一部分的源码分析就已经完成,本章节需重点关注:

  • 事件的变化,状态的定义依赖于Event内部类的两组枚举值
  • 上下游调用关系图需记忆一下,为加强记忆,再最后再贴一下


相关文章
|
4月前
|
缓存
QLExpress使用及源码分析
本文介绍基于QLExpress的规则引擎实现方案,涵盖实体构建、接口定义与脚本编写。通过@QLAlias注解映射字段别名,结合YAML配置规则表达式,实现逻辑解耦。运行时动态解析AST语法树,支持汉化变量与上下文绑定,并提供缓存与延迟执行机制,提升性能与灵活性。
91 0
QLExpress使用及源码分析
|
4月前
|
Arthas 存储 运维
记Arthas实现一次CPU排查与代码热更新
本文介绍使用Arthas排查Java应用CPU占用过高问题的完整流程,涵盖线程分析、阻塞定位、watch命令追踪异常、jad反编译实现热更新及火焰图分析,实现无需重启应用的高效故障排查与代码修复。
165 0
|
4月前
|
canal 缓存 关系型数据库
微服务原理篇(Canal-Redis)
本文介绍了ES索引同步的常见方案,重点讲解Canal+MQ数据同步机制。通过解析MySQL的binlog日志,Canal模拟slave伪装接入主库,实现增量数据捕获,并结合RabbitMQ保证消息顺序性地同步至Elasticsearch。同时探讨了缓存一致性问题,提出使用分布式锁(如Redis)控制并发写操作,避免双写不一致。还涵盖Redis持久化、集群模式、过期淘汰策略及缓存三剑客(穿透、雪崩、击穿)的解决方案,系统梳理了高并发场景下的数据同步与缓存保障技术体系。
249 0
 微服务原理篇(Canal-Redis)
|
4月前
|
XML 算法 安全
详解RAG五种分块策略,技术原理、优劣对比与场景选型之道
RAG通过检索与生成结合,提升大模型在企业场景的准确性与可控性。分块策略是其核心,直接影响检索效果与生成质量。本文系统解析五种主流分块方法:固定大小、语义、递归、基于结构及LLM分块,对比其优缺点与适用场景,并提出组合优化建议,助力构建高效、可信的RAG系统。
218 0
|
4月前
|
人工智能 机器人 Java
黑马最新项目
AIGC项目涵盖大模型私有化部署、聊天机器人、RAG知识库及代码提示工具;天机AI集成SpringAI与多模型工作流;云岚到家聚焦微服务与分布式架构;四方保险构建统一支付与时序数据应用;星辰WMS与Dify项目即将发布。
204 0
黑马最新项目
|
4月前
|
机器学习/深度学习 人工智能 自然语言处理
大模型专业名词解释手册
本手册由油炸小波设计提示词、Manus创作,系统梳理大语言模型核心概念,涵盖基础原理、训练技术、优化压缩、推理应用、评估调试及伦理安全六大模块,深入浅出解析LLM关键技术术语。
430 0
|
机器学习/深度学习
阿里妈妈首提AIGB并实现大规模商业化落地,将在NeurIPS 2024正式开源Benchmark
阿里妈妈提出AI-Generated Bidding(AIGB)新范式及DiffBid生成式竞价模型,突破传统基于强化学习的自动竞价方法局限。AIGB将自动竞价视为生成问题,通过捕捉复杂依赖关系,提升长期规划和随机环境中的稳定性和效果。DiffBid基于条件扩散建模,灵活生成满足特定目标的竞价轨迹,显著提升GMV和ROI。实验结果表明,DiffBid实现了2.81%的GMV增长和3.36%的ROI增长。然而,生成式建模的复杂性也带来了训练和调优的挑战。 论文链接:https://arxiv.org/abs/2405.16141
639 9
畅游采购季,就用云分期!两成首付,轻松上云!
9月17日,阿里云分期付款(云分期)服务业务正式推出。作为业内首款面向企业客户在云电商采购的信用分期服务,云分期通过分析客户在阿里云官网消费情况,评估企业信用,向客户提供灵活便捷的分期服务。与自行向银行申请贷款相比较,云分期作为阿里云和蚂蚁金服(网商银行)、华夏银行联合推出的上云专项金融服务,在产品上更具竞争力。
3372 0
|
供应链 监控 搜索推荐
物联网技术在物流领域的应用会带来哪些影响?
物联网技术在物流领域的应用会带来哪些影响?
1634 58
|
网络协议 开发工具 C语言
Jetson错误(二):wget命令提示无法解析主机地址的问题解决
对于解决在NVIDIA Jetson平台上使用wget命令时出现的无法解析主机地址的问题,提供了两种解决方法:一种是临时修改DNS服务器为Google的公共DNS,另一种是永久修改DNS设置。
700 5