重要:发送事务消息#
在FinalRequestProcessor
的public void processRequest(Request request) {}
方法中,有如下代码
//todo 请求头不为空 if (request.hdr != null) { // 获取请求头 TxnHeader hdr = request.hdr; // 获取事务 Record txn = request.txn; // todo 跟进这个方法-----<--!!!!!!-----处理事务的逻辑,在这里面有向客户端发送事件的逻辑, 回调客户端的watcher----!!!!!!--> rc = zks.processTxn(hdr, txn); }
继续跟进去
// todo 处理事物日志 public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); // todo 继续跟进去!!!!!!!!! // todo 跟进 processTxn(hdr, txn) rc = getZKDatabase().processTxn(hdr, txn);
跟进ZkDatabase.java
中的processTxn(hdr, txn)方法
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { // todo 跟进 processTxn return dataTree.processTxn(hdr, txn); }
跟进到DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { // todo 根据客客户端发送过来的type进行switch, case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); // todo 跟进这个创建节点的方法 createNode( createTxn.getPath(),
根据请求头的值,进而判断出走到那个switch的分支,当前我们在控制台触发,进入到setData分支如下:跟进这个方法中可以看到它主要做了如下几件事
- 使用传递进来的新值替代旧data
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
**触发指定的事件watch,什么事件呢? NodeDataChange, 触发了哪个watcher呢? 跟进去查看 **
//todo setData public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { // todo 修改内存的数据 lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } // todo 终于 看到了 服务端 关于触发NodeDataChanged的事件 dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
补充Watch & EventType 类图#
跟进去dataWatches.triggerWatch(path, EventType.NodeDataChanged);
,源码如下, 主要的逻辑就是取出存放在服务端的watch,然后逐个回调他们的processor函数,问题来了,到底是哪些watcher呢? 其实就是我们在客户端启动时添加getData()时存进去的wather,也就是ServerCnxn
// todo 跟进去服务端的 触发事件, 但是吧, 很纳闷. 就是没有往客户端发送数据的逻辑 public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } // todo 继续跟进去, 看它如何回调的 w.process(e); } return watchers; }
怀着激动的心情去看看ServerCnxn的process()方法做了什么事?
来到ServerCnxn的实现类NIOServerCnxn, 确实很激动,看到了服务端在往客户端发送事务型消息, 并且new ReplyHeader(-1, -1L, 0)第一个位置上的参数是-1, 这一点很重要,因为客户端在接受到这个xid=-1的标记后,就会将这条响应交给EventThread处理
@Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); // todo 往服务端发送了 e event类型消息 sendResponse(h, e, "notification"); }
处理回调回调watch使用的响应#
进入到SendThread
的读就绪源码部分,如下: 它根据header.xid=-1就知道了这是事务类型的响应
// todo 服务端抛出来的事件, 客户端将把他存在EventThread的 watingEvents 队列中 // todo 它的实现逻辑也是这样, 会有另外一个线程不断的消费这个队列 if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } // todo 创建watcherEvent 并将服务端发送回来的数据,反序列化进这个对象中 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path // todo 将server path 反转成 client path if (chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(chrootPath) == 0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } //todo 跟进去 eventThread.queueEvent(we); return; } }
在这个方法的最后,将这个相应添加进EventThread
消费的队列中,跟进 eventThread.queueEvent(we);
// todo public void queueEvent(WatchedEvent event) { // todo 如果事件的类型是 none, 或者sessionState = 直接返回 /** * todo 事件的类型被设计成 watcher 接口的枚举 * None (-1), * NodeCreated (1), * NodeDeleted (2), * NodeDataChanged (3), * NodeChildrenChanged (4); */ if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event // todo 根据事件的具体类型,将观察者具体化, 跟进去 // todo 这个类是ClientCnxn的辅助类,作用就是将watcher 和它观察的事件封装在一起 WatcherSetEventPair pair = new WatcherSetEventPair( //todo 跟进这个 materialize方法. 其实就是从map中取出了和当前client关联的全部 watcher set watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing // todo 将watch集合 和 event 进行排队(按顺序添加到队列里了), 以便后续处理 , 怎么处理呢? 就在EventThread的run循环中消费 // todo watingEvent ==> LinkedBlockingQueue<Object> waitingEvents.add(pair); }
上面的代码主要做了如下几件事:
- 从map中取出和当前事件相关的全部watcher
- 将watcher set 添加进 waitingEvents队列中,等待EventThead的消费
跟进 watcher.materialize(event.getState(), event.getType(),
会追到下面的代码
case NodeDataChanged: // todo node中的data改变和 nodeCreate 都会来到下面的分支 case NodeCreated: synchronized (dataWatches) { // todo dataWatches 就是刚才存放 path : watcher 的map // todo dataWatches.remove(clientPath) 移除并返回clientPath对应的watcher , 放入 result 中 addTo(dataWatches.remove(clientPath), result); }
上面的dataWatches 就是保存path+watcher set的map, 上面的操作是移除并返回指定的watcher,这也说明了,为什么zk原生客户端添加的watcher仅仅会回调一次
EventThread是如何消费waitingEvents的#
EventThread是一条守护线程, 因此它拥有自己的不断在运行的run方法,它就是在这个run方法中对这个队列进行消费的
@Override public void run() { try { isRunning = true; // todo 同样是无限的循环 while (true) { // todo 从watingEvnets 中取出一个 WatcherSetEventPair Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { // todo 本类方法,处理这个事件,继续进入,方法就在下面 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catc
继续跟进它的processEvent(event)
,最终会在这个方法中调用下面的代码,这里的watcher就是我在本篇博客的开始位置添加进去的watcher,至此打完收工
watcher.process(pair.event);
总结:#
当客户端启动时添加watcher对某一个特定path上的node进行监听时 , 客户端的watcher被封装进WatcherRegistion中再进一步发送的服务端
watcher不为空的packet达到服务端后会被巧妙的处理,将ServerCnxn当成watcher注册添加到服务端维护的那份watcher map table中
当watcher关联的node发生了NodeCreate,NodeDeleted ,NodeDataChannged,NodeChildrenChannged时,在最后一个处理器就会触发发送事务类型事件的动作,其实就是回调ServerCnxn的process()方法
事务类型的响应返回到客户端,跟进xid区分出到底是哪种响应,如-1是NodeDataChanged,最终会把这个事务事件提交到EventThread消费的waitingEvents等待EventThread消费它,回调客户端的watcher的process()方法