WatchManager

简介: WatchManager是ZooKeeper服务端用于管理Watcher的核心类,维护了路径到Watcher的映射表(watchTable)及Watcher到路径的反向映射(watch2Paths)。其主要方法包括:addWatch添加监听、removeWatcher移除监听、triggerWatch触发事件并清理状态、size统计监听数量、dumpWatches导出监控信息。所有操作均加锁,确保线程安全,是ZooKeeper事件通知机制的关键实现。

一、前言
  前面已经分析了Watcher机制中的第一部分,即在org.apache.zookeeper下的相关类,接着来分析org.apache.zookeeper.server下的WatchManager类。
二、WatchManager源码分析
2.1 类的属性 
public class WatchManager {
// Logger
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);

// watcher表
private final HashMap<String, HashSet<Watcher>> watchTable =
    new HashMap<String, HashSet<Watcher>>();

// watcher到节点路径的映射
private final HashMap<Watcher, HashSet<String>> watch2Paths =
    new HashMap<Watcher, HashSet<String>>();

}
说明:WatcherManager类用于管理watchers和相应的触发器。watchTable表示从节点路径到watcher集合的映射,而watch2Paths则表示从watcher到所有节点路径集合的映射。
2.2 核心方法分析

  1. size方法
    public synchronized int size(){
    int result = 0;
    for(Set watches : watchTable.values()) { // 遍历watchTable所有的值集合(HashSet集合)
     // 每个集合大小累加
     result += watches.size();
    
    }
    // 返回结果
    return result;
    }
    说明:可以看到size方法是同步的,因此在多线程环境下是安全的,其主要作用是获取watchTable的大小,即遍历watchTable的值集合。
  2. addWatch方法
    public synchronized void addWatch(String path, Watcher watcher) {
    // 根据路径获取对应的所有watcher
    HashSet list = watchTable.get(path);
    if (list == null) { // 列表为空

     // don't waste memory if there are few watches on a node
     // rehash when the 4th entry is added, doubling size thereafter
     // seems like a good compromise
     // 新生成watcher集合
     list = new HashSet<Watcher>(4);
     // 存入watcher表
     watchTable.put(path, list);
    

    }
    // 将watcher直接添加至watcher集合
    list.add(watcher);

    // 通过watcher获取对应的所有路径
    HashSet paths = watch2Paths.get(watcher);
    if (paths == null) { // 路径为空

     // cnxns typically have many watches, so use default cap here
     // 新生成hash集合
     paths = new HashSet<String>();
     // 将watcher和对应的paths添加至映射中
     watch2Paths.put(watcher, paths);
    

    }
    // 将路径添加至paths集合
    paths.add(path);
    }
    说明:addWatch方法同样是同步的,其大致流程如下
      ① 通过传入的path(节点路径)从watchTable获取相应的watcher集合,进入②
      ② 判断①中的watcher是否为空,若为空,则进入③,否则,进入④
      ③ 新生成watcher集合,并将路径path和此集合添加至watchTable中,进入④【类似缓存操作】
      ④ 将传入的watcher添加至watcher集合,即完成了path和watcher添加至watchTable的步骤,进入⑤
      ⑤ 通过传入的watcher从watch2Paths中获取相应的path集合,进入⑥
      ⑥ 判断path集合是否为空,若为空,则进入⑦,否则,进入⑧
      ⑦ 新生成path集合,并将watcher和paths添加至watch2Paths中,进入⑧
      ⑧ 将传入的path(节点路径)添加至path集合,即完成了path和watcher添加至watch2Paths的步骤。
    综上:addWatche方法会将:
    1.入参所对应的watcher添加到入参path所对应的全部Watcher集合中,如path下已有则添加,没有创建新的并添加进去;
    2.入参所对应的path添加到入参watcher所对应给的所有路径集合中,如watcher对应路径为空则创建新的集合进行添加,非空将入参path直接添加进去。

  3. removeWatcher方法  
    public synchronized void removeWatcher(Watcher watcher) {
    // 从wach2Paths中移除watcher,并返回watcher对应的path集合
    HashSet paths = watch2Paths.remove(watcher);
    if (paths == null) { // 集合为空,直接返回
     return;
    
    }
    for (String p : paths) { // 遍历路径集合
     // 从watcher表中根据路径取出相应的watcher集合
     HashSet<Watcher> list = watchTable.get(p);
     if (list != null) { // 若集合不为空
         // 从list中移除该watcher
         list.remove(watcher);
         if (list.size() == 0) { // 移除后list为空,则从watch表中移出
             watchTable.remove(p);
         }
     }
    
    }
    }
    说明:removeWatcher用作从watch2Paths和watchTable中中移除该watcher,其大致步骤如下
      ① 从watch2Paths中移除传入的watcher,并且返回该watcher对应的路径集合,进入②
      ② 判断返回的路径集合是否为空,若为空,直接返回,否则,进入③
      ③ 遍历②中的路径集合,对每个路径,都从watchTable中取出与该路径对应的watcher集合,进入④
      ④ 若③中的watcher集合不为空,则从该集合中移除watcher,并判断移除元素后的集合大小是否为0,若为0,进入⑤
      ⑤ 从watchTable中移除路径
  4. triggerWatch方法
    public Set triggerWatch(String path, EventType type, Set supress) {
    // 根据事件类型、连接状态、节点路径创建WatchedEvent
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);

    // watcher集合
    HashSet watchers;
    synchronized (this) { // 同步块

     // 从watcher表中移除path,并返回其对应的watcher集合
     watchers = watchTable.remove(path);
     if (watchers == null || watchers.isEmpty()) { // watcher集合为空
         if (LOG.isTraceEnabled()) { 
             ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                      "No watchers for " + path);
         }
         // 返回
         return null;
     }
     for (Watcher w : watchers) { // 遍历watcher集合
         // 根据watcher从watcher表中取出路径集合
         HashSet<String> paths = watch2Paths.get(w);
         if (paths != null) { // 路径集合不为空
             // 则移除路径
             paths.remove(path);
         }
     }
    

    }
    for (Watcher w : watchers) { // 遍历watcher集合

     if (supress != null && supress.contains(w)) { // supress不为空并且包含watcher,则跳过
         continue;
     }
     // 进行处理
     w.process(e);
    

    }
    return watchers;
    }
     说明:该方法主要用于触发watch事件,并对事件进行处理。其大致步骤如下
      ① 根据事件类型、连接状态、节点路径创建WatchedEvent,进入②
      ② 从watchTable中移除传入的path对应的键值对,并且返回path对应的watcher集合,进入③
      ③ 判断watcher集合是否为空,若为空,则之后会返回null,否则,进入④
      ④ 遍历②中的watcher集合,对每个watcher,从watch2Paths中取出path集合,进入⑤
      ⑤ 判断④中的path集合是否为空,若不为空,则从集合中移除传入的path。进入⑥
      ⑥ 再次遍历watcher集合,对每个watcher,若supress不为空并且包含了该watcher,则跳过,否则,进入⑦
      ⑦ 调用watcher的process方法进行相应处理,之后返回watcher集合。【这里的process具体怎么执行的呢】

  5. dumpWatches方法
    public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
    if (byPath) { // 控制写入watchTable或watch2Paths
     for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) { // 遍历每个键值对
         // 写入键
         pwriter.println(e.getKey());
         for (Watcher w : e.getValue()) { // 遍历值(HashSet<Watcher>)
             pwriter.print("\t0x");
             pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
             pwriter.print("\n");
         }
     }
    
    } else {
     for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) { // 遍历每个键值对
         // 写入"0x"
         pwriter.print("0x");
         pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
         for (String path : e.getValue()) { // 遍历值(HashSet<String>)
             // 
             pwriter.print("\t");
             pwriter.println(path);
         }
     }
    
    }
    }
      说明:dumpWatches用作将watchTable或watch2Paths写入磁盘。
    三、总结
      WatchManager类用作管理watcher、其对应的路径以及触发器,其方法都是针对两个映射的操作。
相关文章
|
3月前
|
人工智能 安全 数据可视化
面向业务落地的AI产品评测体系设计与平台实现
在AI技术驱动下,淘宝闪购推进AI应用落地,覆盖数字人、数据分析、多模态创作与搜推AI化四大场景。面对研发模式变革与Agent链路复杂性,构建“评什么、怎么评、如何度量”的评测体系,打造端到端质量保障平台,并规划多模态评测、可视化标注与插件市场,支撑业务持续创新。
790 38
|
3月前
|
机器学习/深度学习 缓存 物联网
打造社交APP人物动漫化:通义万相wan2.x训练优化指南
本项目基于通义万相AIGC模型,为社交APP打造“真人变身跳舞动漫仙女”特效视频生成功能。通过LoRA微调与全量训练结合,并引入Sage Attention、TeaCache、xDIT并行等优化技术,实现高质量、高效率的动漫风格视频生成,兼顾视觉效果与落地成本,最终优选性价比最高的wan2.1 lora模型用于生产部署。(239字)
1345 103
|
3月前
|
JSON 安全 JavaScript
HTTPS 原理
HTTPS是HTTP与SSL/TLS的结合,通过数字证书验证身份,利用非对称加密安全交换会话密钥,再以对称加密高效传输数据。它确保了通信的机密性、完整性和服务器真实性,在互联网上构建安全加密通道。
|
3月前
|
人工智能 安全 数据可视化
面向业务落地的AI产品评测体系设计与平台实现
在AI技术驱动下,淘宝闪购推进大模型应用落地,构建覆盖“评什么、怎么评、如何度量”的全链路评测体系。面对研发模式变革与Agent复杂性挑战,平台以端到端评测为主、分层测评为辅,打造可回放环境、多裁判机制及变更分级策略,实现质量与效率平衡。已支撑10+部门、90+AI产品,沉淀千余评测集,问题解决率超80%。未来将拓展多模态评测、可视化标注与插件市场,推动评测生态化发展。
|
4月前
|
JSON 安全 JavaScript
深入浅出解析 HTTPS 原理
HTTPS是HTTP与SSL/TLS结合的安全协议,通过数字证书验证身份,利用非对称加密安全交换会话密钥,再以对称加密高效传输数据,确保通信的机密性、完整性和真实性。整个过程如同建立一条加密隧道,保障网络交互安全。
2377 16
|
3月前
|
人工智能 计算机视觉 测试技术
Meta SAM3开源
Meta发布并开源SAM 3,首个支持文本、点、框等多提示的统一图像视频分割模型,突破性实现开放词汇概念的全实例分割。基于Meta Perception Encoder与DETR架构,结合AI与人工协同数据引擎,构建超400万概念数据集,在SA-Co基准达人类水平75%-80%。支持大规模可提示分割与跟踪,推动视觉基础模型新进展。(239字)
|
3月前
|
消息中间件 Java 数据格式
微服务核心组件:消息中间件(MQ)从入门到实战
本章深入讲解微服务中消息中间件的核心作用,聚焦RabbitMQ与SpringAMQP实战。涵盖同步与异步通信对比、MQ选型分析,通过Docker快速部署RabbitMQ,详解生产者/消费者模型、四种消息模式(简单队列、工作队列、发布订阅、通配符路由),并引入prefetch优化与JSON序列化提升性能。结合注解驱动开发,全面掌握高可用、低耦合的异步通信架构设计。(239字)
|
3月前
|
XML 算法 安全
详解RAG五种分块策略,技术原理、优劣对比与场景选型之道
RAG通过检索与生成结合,提升大模型在企业场景的准确性与安全性。分块策略是其核心,直接影响检索效果与答案质量。本文系统解析五种主流分块方法——固定大小、语义、递归、基于结构及LLM分块,对比优缺点与适用场景,助力构建高效、可靠的RAG系统。
|
3月前
|
机器学习/深度学习 人工智能 算法
让AI真正读懂长文本的秘密武器
通义实验室推出QwenLong-L1.5,基于Qwen3-30B-A3B打造的长文本推理专家。通过高质量多跳数据合成、稳定强化学习算法与突破窗口限制的记忆框架,系统性解决长文本“学不好、用不了”难题,在多跳推理、超长上下文等任务中媲美GPT-5与Gemini。

热门文章

最新文章