47-微服务技术栈(高级):分布式协调服务zookeeper源码篇(Watcher机制-2[WatchManager])

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 前面已经分析了Watcher机制中的第一部分,即在org.apache.zookeeper下的相关类,接着来分析org.apache.zookeeper.server下的WatchManager类。

一、前言

  前面已经分析了Watcher机制中的第一部分,即在org.apache.zookeeper下的相关类,接着来分析org.apache.zookeeper.server下的WatchManager类。

二、WatchManager源码分析

2.1 类的属性 

public class WatchManager {

   // Logger

   private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);


   // watcher表

   private final HashMap<String, HashSet<Watcher>> watchTable =

       new HashMap<String, HashSet<Watcher>>();


   // watcher到节点路径的映射

   private final HashMap<Watcher, HashSet<String>> watch2Paths =

       new HashMap<Watcher, HashSet<String>>();

}

说明:WatcherManager类用于管理watchers和相应的触发器。watchTable表示从节点路径到watcher集合的映射,而watch2Paths则表示从watcher到所有节点路径集合的映射。

2.2 核心方法分析

1. size方法

public synchronized int size(){

   int result = 0;

   for(Set<Watcher> watches : watchTable.values()) { // 遍历watchTable所有的值集合(HashSet<Watcher>集合)

       // 每个集合大小累加

       result += watches.size();

   }

   // 返回结果

   return result;

}

说明:可以看到size方法是同步的,因此在多线程环境下是安全的,其主要作用是获取watchTable的大小,即遍历watchTable的值集合。

2. addWatch方法

public synchronized void addWatch(String path, Watcher watcher) {

   // 根据路径获取对应的所有watcher

   HashSet<Watcher> list = watchTable.get(path);

   if (list == null) { // 列表为空

       // don't waste memory if there are few watches on a node

       // rehash when the 4th entry is added, doubling size thereafter

       // seems like a good compromise

       // 新生成watcher集合

       list = new HashSet<Watcher>(4);

       // 存入watcher表

       watchTable.put(path, list);

   }

   // 将watcher直接添加至watcher集合

   list.add(watcher);


   // 通过watcher获取对应的所有路径

   HashSet<String> paths = watch2Paths.get(watcher);

   if (paths == null) { // 路径为空

       // cnxns typically have many watches, so use default cap here

       // 新生成hash集合

       paths = new HashSet<String>();

       // 将watcher和对应的paths添加至映射中

       watch2Paths.put(watcher, paths);

   }

   // 将路径添加至paths集合

   paths.add(path);

}

说明:addWatch方法同样是同步的,其大致流程如下

  ① 通过传入的path(节点路径)从watchTable获取相应的watcher集合,进入②

  ② 判断①中的watcher是否为空,若为空,则进入③,否则,进入④

  ③ 新生成watcher集合,并将路径path和此集合添加至watchTable中,进入④【类似缓存操作】

  ④ 将传入的watcher添加至watcher集合,即完成了path和watcher添加至watchTable的步骤,进入⑤

  ⑤ 通过传入的watcher从watch2Paths中获取相应的path集合,进入⑥

  ⑥ 判断path集合是否为空,若为空,则进入⑦,否则,进入⑧

  ⑦ 新生成path集合,并将watcher和paths添加至watch2Paths中,进入⑧

  ⑧ 将传入的path(节点路径)添加至path集合,即完成了path和watcher添加至watch2Paths的步骤。

综上:addWatche方法会将:

1.入参所对应的watcher添加到入参path所对应的全部Watcher集合中,如path下已有则添加,没有创建新的并添加进去;

2.入参所对应的path添加到入参watcher所对应给的所有路径集合中,如watcher对应路径为空则创建新的集合进行添加,非空将入参path直接添加进去。

3. removeWatcher方法  

public synchronized void removeWatcher(Watcher watcher) {

   // 从wach2Paths中移除watcher,并返回watcher对应的path集合

   HashSet<String> paths = watch2Paths.remove(watcher);

   if (paths == null) { // 集合为空,直接返回

       return;

   }

   for (String p : paths) { // 遍历路径集合

       // 从watcher表中根据路径取出相应的watcher集合

       HashSet<Watcher> list = watchTable.get(p);

       if (list != null) { // 若集合不为空

           // 从list中移除该watcher

           list.remove(watcher);

           if (list.size() == 0) { // 移除后list为空,则从watch表中移出

               watchTable.remove(p);

           }

       }

   }

}

说明:removeWatcher用作从watch2Paths和watchTable中中移除该watcher,其大致步骤如下

  ① 从watch2Paths中移除传入的watcher,并且返回该watcher对应的路径集合,进入②

  ② 判断返回的路径集合是否为空,若为空,直接返回,否则,进入③

  ③ 遍历②中的路径集合,对每个路径,都从watchTable中取出与该路径对应的watcher集合,进入④

  ④ 若③中的watcher集合不为空,则从该集合中移除watcher,并判断移除元素后的集合大小是否为0,若为0,进入⑤

  ⑤ 从watchTable中移除路径

4. triggerWatch方法

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {

   // 根据事件类型、连接状态、节点路径创建WatchedEvent

   WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);


   // watcher集合

   HashSet<Watcher> watchers;

   synchronized (this) { // 同步块

       // 从watcher表中移除path,并返回其对应的watcher集合

       watchers = watchTable.remove(path);

       if (watchers == null || watchers.isEmpty()) { // watcher集合为空

           if (LOG.isTraceEnabled()) {

               ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,

                                        "No watchers for " + path);

           }

           // 返回

           return null;

       }

       for (Watcher w : watchers) { // 遍历watcher集合

           // 根据watcher从watcher表中取出路径集合

           HashSet<String> paths = watch2Paths.get(w);

           if (paths != null) { // 路径集合不为空

               // 则移除路径

               paths.remove(path);

           }

       }

   }

   for (Watcher w : watchers) { // 遍历watcher集合

       if (supress != null && supress.contains(w)) { // supress不为空并且包含watcher,则跳过

           continue;

       }

       // 进行处理

       w.process(e);

   }

   return watchers;

}

 说明:该方法主要用于触发watch事件,并对事件进行处理。其大致步骤如下

  ① 根据事件类型、连接状态、节点路径创建WatchedEvent,进入②

  ② 从watchTable中移除传入的path对应的键值对,并且返回path对应的watcher集合,进入③

  ③ 判断watcher集合是否为空,若为空,则之后会返回null,否则,进入④

  ④ 遍历②中的watcher集合,对每个watcher,从watch2Paths中取出path集合,进入⑤

  ⑤ 判断④中的path集合是否为空,若不为空,则从集合中移除传入的path。进入⑥

  ⑥ 再次遍历watcher集合,对每个watcher,若supress不为空并且包含了该watcher,则跳过,否则,进入⑦

  ⑦ 调用watcher的process方法进行相应处理,之后返回watcher集合。【这里的process具体怎么执行的呢

5. dumpWatches方法

public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {

   if (byPath) { // 控制写入watchTable或watch2Paths

       for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) { // 遍历每个键值对

           // 写入键

           pwriter.println(e.getKey());

           for (Watcher w : e.getValue()) { // 遍历值(HashSet<Watcher>)

               pwriter.print("\t0x");

               pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));

               pwriter.print("\n");

           }

       }

   } else {

       for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) { // 遍历每个键值对

           // 写入"0x"

           pwriter.print("0x");

           pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));

           for (String path : e.getValue()) { // 遍历值(HashSet<String>)

               //

               pwriter.print("\t");

               pwriter.println(path);

           }

       }

   }

}

  说明:dumpWatches用作将watchTable或watch2Paths写入磁盘。

三、总结

  WatchManager类用作管理watcher、其对应的路径以及触发器,其方法都是针对两个映射的操作。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
14天前
|
负载均衡 监控 Java
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
|
1天前
|
消息中间件 Java API
解密微服务架构:如何在Java中实现高效的服务通信
微服务架构作为一种现代软件开发模式,通过将应用拆分成多个独立的服务,提升了系统的灵活性和扩展性。然而,实现微服务之间的高效通信仍然是许多开发者面临的挑战。本文将探讨在Java环境中实现微服务架构时,如何使用不同的通信机制来优化服务之间的交互,包括同步和异步通信的方法,以及相关的最佳实践。
|
12天前
|
Prometheus 监控 Java
微服务架构下的服务治理策略:打破服务混乱的惊天秘籍,开启系统稳定的神奇之门!
【8月更文挑战第7天】微服务架构将应用细分为可独立部署的小服务,提升灵活性与可扩展性。但服务增多带来治理挑战。通过服务注册与发现(如Eureka)、容错机制(如Hystrix)、监控工具(如Prometheus+Grafana)、集中配置管理(如Spring Cloud Config)和服务网关(如Zuul),可有效解决这些挑战,确保系统的高可用性和性能。合理运用这些技术和策略,能充分发挥微服务优势,构建高效应用系统。
26 1
|
20天前
|
消息中间件 监控 Cloud Native
构建高效后端服务:微服务架构的实践与挑战
【7月更文挑战第30天】在现代软件开发中,后端服务的效率和可扩展性是项目成功的关键因素之一。微服务架构作为一种新兴的软件开发模式,以其灵活性、独立性和可维护性受到了广泛的关注和应用。本文将深入探讨微服务架构的核心概念,分析其在实际应用中的优势与面临的挑战,并分享一些实践技巧和最佳实践,帮助读者理解如何设计并实现一个高效的后端服务。
36 3
|
4天前
|
Kubernetes Nacos 微服务
【技术难题破解】Nacos v2.2.3 + K8s 微服务注册:强制删除 Pod 却不消失?!7步排查法+实战代码,手把手教你解决Nacos Pod僵死问题,让服务瞬间满血复活!
【8月更文挑战第15天】Nacos作为微服务注册与配置中心受到欢迎,但有时会遇到“v2.2.3 k8s 微服务注册nacos强制删除 pod不消失”的问题。本文介绍此现象及其解决方法,帮助开发者确保服务稳定运行。首先需检查Pod状态与事件、配置文件及Nacos配置,确认无误后可调整Pod生命周期管理,并检查Kubernetes版本兼容性。若问题持续,考虑使用Finalizers、审查Nacos日志或借助Kubernetes诊断工具。必要时,可尝试手动强制删除Pod。通过系统排查,通常能有效解决此问题。
10 0
|
11天前
|
API Go 数据安全/隐私保护
go-zero微服务框架的静态文件服务
【8月更文挑战第7天】`go-zero` 微服务框架支持多种静态文件服务实现方式。常用方法是利用 `Go` 标准库 `http.FileServer`。通过设置静态文件根目录并使用 `http.StripPrefix` 去除路径前缀,能确保 `/static/` 开头的请求正确返回文件。此外,结合 `go-zero` 的路由机制可更灵活地控制静态文件服务,例如仅在特定 API 路径 `/api/static` 下提供服务,从而实现精细化访问控制。
|
12天前
|
监控 供应链 安全
构建高效微服务架构:API网关与服务熔断策略
【7月更文挑战第38天】随着现代应用程序向微服务架构的转型,系统的稳定性和效率成为了开发团队关注的焦点。本文将探讨在微服务环境中实现系统可靠性的关键组件——API网关,以及如何在服务间通讯时采用熔断机制来防止故障蔓延。通过分析API网关的核心功能和设计原则,并结合熔断策略的最佳实践,我们旨在提供一套提高分布式系统弹性的策略。
|
26天前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
29天前
|
NoSQL Java Redis
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
|
1月前
|
canal 缓存 NoSQL
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;先删除缓存还是先修改数据库,双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略

热门文章

最新文章

相关产品

  • 微服务引擎