前言
今天继续Watcher机制的源码研究,ZK的源码比起Spring和MQ来说要难很多,不过不得不说ZK真的很优秀。下面继续源码解读。
Watcher在Server端的存储Watcher在server都由WatchManager存储管理类图如下:
方法的主要作用:
1.size()获取记录的watcher的数量
2.addWatch() 注册一个path的watcher
3.removeWatcher(),删除watcher
4.triggerWatch(),根据path,EventType找到出发的Watches,触发这些watcher
triggerWatch()是用于出发watch事件,总结下流程:
1). 根据事件类型、连接状态、节点路径创建WatchedEvent
2). 从watchTable中移除传入的path对应的键值对,并且返回path对应的watcher集合
3). 判断watcher集合是否为空,若为空,则之后会返回null
4). 遍历2中的watcher集合,对每个watcher,从watch2Paths中取出path集合
5). 判断4中的path集合是否为空,若不为空,则从集合中移除传入的path
6). 再次遍历watcher集合,对每个watcher,若supress不为空并且包含了该watcher,否则跳过
7). 调用watcher的process方法进行相应处理,之后返回watcher集合
值得注意得 w.process(e)是NIOServerCnxn.process()不要和Client端搞混了,dumpWatches用于讲两个map dump到writer最终写入磁盘
Watcher的运行机制
要点:Client端注册watcher 主要通过以下几个api完成watche的注册register(),exists(),getData(),getChildren(),这里不多赘述。直接进入核心代码: 衔接代码,最终会进入写事件中,调用createBB写入ByteBuffer并通过nio发送,此时packet从outgoingQueue移入到pendingQueue中等待server的回复,到此为止client的getData请求已经发送至Server,但是此时的Watch还没有在client端完成注册,那什么时候完成注册呢,其实是在Server执行完成后,返回给Client端的时候完成注册Server返回Client进入上述代码的读事件,调用sendThread.readResponse(incomingBuffer),然后调用ClientCnxn.finishPacket()方法。 另外,Packet是zk的通信协议数据包,用于客户端和服务端的网络传输,在 ClientCnxn 中 WatchRegistration 也会被封装到 Packet 中,调用 queuePacket放入outgoingQueue即发送队列中(生产packet)然后SendThread 线程调用doTransport方法,从outgoingQueue中消费Packet,最中调用ClientCnxnSocketNIO.doIO()进行I/O操作发送至服务端。我们可以看到在WatchRegistration.register()完成watch在WatchManger的注册,也就是客户端在放请求后需要等到Server端的正确响应才可以注册Watcherd的图表示一下
服务端处理Watcher
继续看Server端收到I/O请求时,怎么处理请求的 上述代码可知在processEvent中完成了客户端Watcher的执行,到此Watcher的触发流程算是结束了。 总结一下:
1. 客户端发送注册Watcher的packet请求
2. Server端处理请求,注册path->ServerCnxn到WatchManger
3. 客户端收到Server端的响应,完成客户端的Watcher的注册
4. 服务端触发Watcher,发送event到Client
5. Client根据event触发对应的Watche调用process()
流程其实很简单,但是涉及到的代码太多太多,这就是一次完成的zk请求。 通过这里触发Watcher的流程也可以看出zk的watch都是一次性的,出发后都会删除,很多开源的客户端都会去解决这个问题达到重复注册的效果。
吐槽 一次小小的请求,涉及到代码实在太多太多..接下来就是剖析这里的细节,比如Session的存储,Leader的选举,I/O的详细处理等等。整理下来Zk的Watcher注册,运行机制大概就这些,细节部分后续补上,才能更加容易理解。