一、前言
前面已经分析了Watcher机制中的第一部分,即在org.apache.zookeeper下的相关类,接着来分析org.apache.zookeeper.server下的WatchManager类。
二、WatchManager源码分析
2.1 类的属性
说明:WatcherManager类用于管理watchers和相应的触发器。watchTable表示从节点路径到watcher集合的映射,而watch2Paths则表示从watcher到所有节点路径集合的映射。
2.2 核心方法分析
- size方法
说明:可以看到size方法是同步的,因此在多线程环境下是安全的,其主要作用是获取watchTable的大小,即遍历watchTable的值集合。 - addWatch方法
说明:addWatch方法同样是同步的,其大致流程如下
① 通过传入的path(节点路径)从watchTable获取相应的watcher集合,进入②
② 判断①中的watcher是否为空,若为空,则进入③,否则,进入④
③ 新生成watcher集合,并将路径path和此集合添加至watchTable中,进入④【类似缓存操作】
④ 将传入的watcher添加至watcher集合,即完成了path和watcher添加至watchTable的步骤,进入⑤
⑤ 通过传入的watcher从watch2Paths中获取相应的path集合,进入⑥
⑥ 判断path集合是否为空,若为空,则进入⑦,否则,进入⑧
⑦ 新生成path集合,并将watcher和paths添加至watch2Paths中,进入⑧
⑧ 将传入的path(节点路径)添加至path集合,即完成了path和watcher添加至watch2Paths的步骤。
综上:addWatche方法会将:
1.入参所对应的watcher添加到入参path所对应的全部Watcher集合中,如path下已有则添加,没有创建新的并添加进去;
2.入参所对应的path添加到入参watcher所对应给的所有路径集合中,如watcher对应路径为空则创建新的集合进行添加,非空将入参path直接添加进去。 - removeWatcher方法
说明:removeWatcher用作从watch2Paths和watchTable中中移除该watcher,其大致步骤如下
① 从watch2Paths中移除传入的watcher,并且返回该watcher对应的路径集合,进入②
② 判断返回的路径集合是否为空,若为空,直接返回,否则,进入③
③ 遍历②中的路径集合,对每个路径,都从watchTable中取出与该路径对应的watcher集合,进入④
④ 若③中的watcher集合不为空,则从该集合中移除watcher,并判断移除元素后的集合大小是否为0,若为0,进入⑤
⑤ 从watchTable中移除路径 triggerWatch方法
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Set triggerWatch(String path, EventType type, Set supress) {
// 根据事件类型、连接状态、节点路径创建WatchedEvent
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);// watcher集合
HashSet watchers;
synchronized (this) { // 同步块// 从watcher表中移除path,并返回其对应的watcher集合 watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { // watcher集合为空 if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } // 返回 return null; } for (Watcher w : watchers) { // 遍历watcher集合 // 根据watcher从watcher表中取出路径集合 HashSet<String> paths = watch2Paths.get(w); if (paths != null) { // 路径集合不为空 // 则移除路径 paths.remove(path); } }}
for (Watcher w : watchers) { // 遍历watcher集合if (supress != null && supress.contains(w)) { // supress不为空并且包含watcher,则跳过 continue; } // 进行处理 w.process(e);}
return watchers;
}
说明:该方法主要用于触发watch事件,并对事件进行处理。其大致步骤如下
① 根据事件类型、连接状态、节点路径创建WatchedEvent,进入②
② 从watchTable中移除传入的path对应的键值对,并且返回path对应的watcher集合,进入③
③ 判断watcher集合是否为空,若为空,则之后会返回null,否则,进入④
④ 遍历②中的watcher集合,对每个watcher,从watch2Paths中取出path集合,进入⑤
⑤ 判断④中的path集合是否为空,若不为空,则从集合中移除传入的path。进入⑥
⑥ 再次遍历watcher集合,对每个watcher,若supress不为空并且包含了该watcher,则跳过,否则,进入⑦
⑦ 调用watcher的process方法进行相应处理,之后返回watcher集合。【这里的process具体怎么执行的呢】- dumpWatches方法
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
if (byPath) { // 控制写入watchTable或watch2Paths
} else {for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) { // 遍历每个键值对 // 写入键 pwriter.println(e.getKey()); for (Watcher w : e.getValue()) { // 遍历值(HashSet<Watcher>) pwriter.print("\t0x"); pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId())); pwriter.print("\n"); } }
}for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) { // 遍历每个键值对 // 写入"0x" pwriter.print("0x"); pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId())); for (String path : e.getValue()) { // 遍历值(HashSet<String>) // pwriter.print("\t"); pwriter.println(path); } }
}
说明:dumpWatches用作将watchTable或watch2Paths写入磁盘。
三、总结
WatchManager类用作管理watcher、其对应的路径以及触发器,其方法都是针对两个映射的操作。