概述
ZK提供了分布式数据的发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态发生变化时,会通知所有的订阅者。在ZK中引入了 Watcher 机制来实现这种分布式的通知功能。
ZK允许客户端向服务器端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher ,那么就会向指定客户端发送一个事件通知来实现分布式通知功能。
大致流程就是 Client 向ZK中注册 Watcher,如果注册成功的话,会将对应的 Watcher 存储在本地。当ZK服务器端触发 Watcher 事件之后,会向客户端发送通知,客户端会从 ClientWatchManager 中取出对应的 Watcher 进行回调。
Watcher 接口
说了那么久、Watcher 究竟是啥?有什么用处?
/** * This interface specifies the public interface an event handler class must * implement. A ZooKeeper client will get various events from the ZooKeeper * server it connects to. An application using such a client handles these * events by registering a callback object with the client. The callback object * is expected to be an instance of a class that implements Watcher interface. */ @InterfaceAudience.Public public interface Watcher { void process(WatchedEvent event); } 复制代码
只要你通过这个接口的实现类对象去向ZK服务端注册监听,那么当有ZK服务端有事件通知到Client,那么就会回调这个 process 方法。
WatchedEvent
那么 WatchedEvent 又有什么玄机呢?
public class WatchedEvent { /** * Enumeration of states the ZooKeeper may be at the event */ private final KeeperState keeperState; /** * Enumeration of types of events that may occur on the ZooKeeper */ private final EventType eventType; private String path; } 复制代码
KeeperState 和 EventType 是两个枚举类,分别代表通知状态和事件类型。path 就是 client 监听到路径。
常见的 KeeperState 和 EventType 组合
KeeperState | EventType | 触发条件 | 说明 |
SyncConnected | None(-1) | 客户端与服务端成功建立会话 | 客户端和服务端处于连接状态 |
SyncConnected | NodeCreated(1) | Watcher 监听对应的数据节点被创建 | 客户端和服务端处于连接状态 |
SyncConnected | NodeDeleted(2) | Watcher 监听对应的数据节点被删除 | 客户端和服务端处于连接状态 |
SyncConnected | NodeDataChanged(3) | Watcher 监听对应的数据节点的内容发生变更(数据内容和数据版本号) | 客户端和服务端处于连接状态 |
SyncConnected | NodeChildrenChanged(4) | Watcher 监听对应的数据节点的子节点列表发生改变 | 客户端和服务端处于连接状态 |
关于 NodeDataChanged 事件类型,这里的变更包括节点的数据内容发生变更,也包括数据的版本号(dataVersion) 变更,所以只要有客户端调用了数据更新接口,不管数据内容是否发生改变、都会导致 dataVersion 发生改变,从而触发对应 Watcher 的监听。这样子就能避免典型乐观锁 ABA 的问题。
WatcherEvent
我们可以在 WatchedEvent 中发现有这么一个方法
/** * Convert WatchedEvent to type that can be sent over network */ public WatcherEvent getWrapper() { return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path); } 复制代码
笼统的说,WatcherEvent 和 WatchedEvent 表示的是同一个事物,都是对服务端事件的封装。WatchedEvent 是一个用于逻辑处理的对象、而WatcherEvent 是用于传输的实体对象。从上面的代码我们可以看到,创建 WatcherEvent 的参数就是 WatchedEvent 中各个属性的值。
people.apache.org/~larsgeorge… 中可以看到它实现了 Record 接口
public class WatcherEvent extends Object implements org.apache.jute.Record 复制代码
而在 Record 接口中定义了序列化和反序列的方法
@InterfaceAudience.Public public interface Record { void serialize(OutputArchive archive, String tag) throws IOException; void deserialize(InputArchive archive, String tag) throws IOException; } 复制代码
相关组件
相关过程
概括可以分为三个过程
- 客户端注册 Watcher
- 服务端处理 Watcher
- 客户端回调 Watcher
客户端注册 Watcher
我们在创建一个ZK 客户端实例对象的时候、可以向构造方法中传入一个默认的 Watcher
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) 复制代码
参数中的这个 Watcher 将会被保存在 ZKWatchManager 中,作为整个会话期间的默认的 Watcher
watchManager.defaultWatcher = watcher; 复制代码
除此之外、ZK 客户端也可以通过 getData
,getChildren
,exist
三个接口向ZK服务端注册 Watcher
我们以 getData
接口来分析
public byte[] getData(final String path, Watcher watcher, Stat stat){ ..... } public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException { return getData(path, getDefaultWatcher(watch), stat); } 复制代码
如果我们的参数 watch
为 true , 那么 getDefaultWatcher
就是去拿我们创建Zookeeper 时传入的默认的 Watcher
private Watcher getDefaultWatcher(boolean required) { if (required) { if (watchManager.defaultWatcher != null) { return watchManager.defaultWatcher; } else { throw new IllegalStateException("Default watcher is required, but it is null."); } } return null; } 复制代码
下面是 完整的 getData
代码
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path // 创建 数据类型 的 watch registration WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } // 将客户端change root directory 的路径加上、变回服务端那边正常的路径 final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); // 标记是否有 watcher request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); } 复制代码
- 创建一个 DataWatchRegistration
- 转换 path (客户端这边可能 change root directory,发送请求前要将其转为为服务端那边的路径)
- 使用 ClientCnxn 提交这个请求
public ReplyHeader submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket( h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); .... .... return r; } 复制代码
最终这个 Request 被加入到 outgoingQueue中
public Packet queuePacket( RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; packet = new Packet(h, r, request, response, watchRegistration); synchronized (state) { ... .... outgoingQueue.add(packet); } } 复制代码
最终发送请求到服务端,在 SendThread#readResponse 中处理返回结果
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); switch (replyHdr.getXid()) { case PING_XID: .... .... return; case AUTHPACKET_XID: ... ... return; // 处理服务端到通知 case NOTIFICATION_XID: LOG.debug("Got notification session id: 0x{}", Long.toHexString(sessionId)); WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(chrootPath) == 0) { event.setPath("/"); } else if (serverPath.length() > chrootPath.length()) { event.setPath(serverPath.substring(chrootPath.length())); } else { LOG.warn("Got server path {} which is too short for chroot path {}.", event.getPath(), chrootPath); } } WatchedEvent we = new WatchedEvent(event); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); // 加入到事件队列中、由EventThread处理 eventThread.queueEvent(we); return; default: break; } // 移除这个Pacjet Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { .... ..... } finally { // 将Watcher 保存在 ClientWatchManager finishPacket(packet); } } 复制代码
主要做了啥事情
- 反序列化,获取请求头中的
XID
判断是否是服务端到通知、如果是的话、加入到事件队列中、由EventThread去处理 - 从 outgoingQueue中移除 Packet。
- 调用 finishPacket 函数、进行一些后续处理
protected void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { p.watchRegistration.register(err); } ... ... } 复制代码
最后回到 WatchRegistration
将对应的 Watcher 注册到对应的 Map<String, Set<Watcher>>
中。
服务端处理 Watcher
先来认识几个主要的组件类
WatchManager 是 ZK 服务端 Watcher 的管理者,其内部管理的 watchTable 和 watch2Paths 两个存储结构,分别用两个维度对 Watcher 进行存储。
- watchTable 从数据节点路径的粒度来托管 Watcher。
- watch2Paths 从 Watcher 的粒度来控制事件触发需要触发的数据节点。
ServerCnxn
是一个 Zookeeper 客户端和负担之间的连接接口、代表了一个客户端和服务端的连接,其默认实现是 NIOServerCnxn
,从 3.4.0 开始引入了基于Netty 的实现 NettyServerCnxn
。
ServerCnxn
同时实现了 Watcher
接口,因此我们可以将其看作是一个 Watcher
对象.
数据节点的路径和 ServerCnxn
都会被存储在 WatchManager
中
服务端收到客户端的请求后会在 FinalRequestProcessor#processRequest
中判断当前请求是否需要注册 Watcher。
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); path = getDataRequest.getPath(); // 调用处理 getData 请求的方法 rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); break; } 复制代码
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException { .... .... // 这注意、客户端是否需要注册 Watcher、请求中只是有一个 boolean 字段来表示 // 从请求中获取是否需要注册 Watcher byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null); return new GetDataResponse(b, stat); } 复制代码
public byte[] getData(String path, Stat stat, Watcher watcher) { return dataTree.getData(path, stat, watcher); } public byte[] getData(String path, Stat stat, Watcher watcher) { synchronized (n) { n.copyStat(stat); if (watcher != null) { // 这里的 dataWatches 就是 IWatchManager 接口对应的实例 dataWatches.addWatch(path, watcher); } data = n.data; } updateReadStat(path, data == null ? 0 : data.length); return data; } 复制代码
最终会被放置到 watchTable
和 watch2Paths
中存储
@Override public boolean addWatch(String path, Watcher watcher) { return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE); } @Override public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) { if (isDeadWatcher(watcher)) { return false; } // 从中拿出 Set Set<Watcher> list = watchTable.get(path); if (list == null) { list = new HashSet<>(4); watchTable.put(path, list); } list.add(watcher); // Set<String> paths = watch2Paths.get(watcher); if (paths == null) { paths = new HashSet<>(); watch2Paths.put(watcher, paths); } watcherModeManager.setWatcherMode(watcher, path, watcherMode); return paths.add(path); } 复制代码
Watcher 的触发
NodeDataChange
的触发是我们节点的数据内容或者节点的 dataVersion
发生改变。
那么我们可以来看看 org.apache.zookeeper.server.DataTree#setData
方法
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte[] lastdata = null; synchronized (n) { lastdata = n.data; nodes.preChange(path, n); n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); nodes.postChange(path, n); } .... .... updateWriteStat(path, dataBytes); // 调用IWatchManager 的方法 dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; } 复制代码
@Override public WatcherOrBitSet triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } @Override public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { // 封装成 WatchedEvent WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); Set<Watcher> watchers = new HashSet<>(); PathParentIterator pathParentIterator = getPathParentIterator(path); synchronized (this) { for (String localPath : pathParentIterator.asIterable()) { Set<Watcher> thisWatchers = watchTable.get(localPath); // 无监听 if (thisWatchers == null || thisWatchers.isEmpty()) { continue; } Iterator<Watcher> iterator = thisWatchers.iterator(); while (iterator.hasNext()) { Watcher watcher = iterator.next(); WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath); if (watcherMode.isRecursive()) { } else if (!pathParentIterator.atParentPath()) { watchers.add(watcher); if (!watcherMode.isPersistent()) { // 移除掉 iterator.remove(); Set<String> paths = watch2Paths.get(watcher); if (paths != null) { // 从 watch2Paths 中移除掉 paths.remove(localPath); } } } } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } // 调用 process 方法 w.process(e); } ..... ..... return new WatcherOrBitSet(watchers); } 复制代码
上面已经提及、ServerCnxn
实现了 Watcher
接口,我们看看 org.apache.zookeeper.server.NIOServerCnxn#process
@Override public void process(WatchedEvent event) { // 请求头中的 XID 设置为 -1,上面分析 SendThread.readResponse 的时候提及过 ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); // WatchedEvent 变为 WatcherEvent WatcherEvent e = event.getWrapper(); // 给客户端发送通知 sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error); } 复制代码
基本流程
- 封装 WatchedEvent
- 从
watchTable
中找到对应的 Watcher,并将watchTable
和watch2Paths
中相关的 Watcher 和路径清除掉(只能触发一次喔) - 调用
process
方法。
客户端回调 Watcher
我们先来认识下 EventThread
这个类
继承自 Thread
,使用 LinkedBlockingQueue<Object> waitingEvents
保存将要处理的事件,然后 ```run`` 方法不断的从队列中获取进行处理。
我们已经知道客户端中由 SendThread#readResponse
处理(这段代码也出现在上面的客户端注册 Watcher 的时候)
case NOTIFICATION_XID: LOG.debug("Got notification session id: 0x{}", Long.toHexString(sessionId)); WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(chrootPath) == 0) { event.setPath("/"); } else if (serverPath.length() > chrootPath.length()) { event.setPath(serverPath.substring(chrootPath.length())); } else { LOG.warn("Got server path {} which is too short for chroot path {}.", event.getPath(), chrootPath); } } WatchedEvent we = new WatchedEvent(event); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); // 加入到事件队列中、由EventThread处理 eventThread.queueEvent(we); return; 复制代码
加入到 ```waitingEvents`` 队列中
public void queueEvent(WatchedEvent event) { queueEvent(event, null); } private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); final Set<Watcher> watchers; if (materializedWatchers == null) { // 从 clientWatchManager 中获取对应的 Watcher,也会从对应的 Map中移除 Watcher // 一样是一次性的 watchers = watcher.materialize(event.getState(), event.getType(), event.getPath()); } else { watchers = new HashSet<Watcher>(); watchers.addAll(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // 加入到 waitingEvents 中、等待 run 方法 拿出来处理 waitingEvents.add(pair); } 复制代码
run 方法
public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } ...... ...... }} } private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { // 调用 process 方法,串行同步处理 watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher.", t); } } } } ....... ....... } 复制代码
总结
Watcher 的特性
- 一次性:无论是客户端还是服务端、一旦 Watcher 触发、都会将其从存储中移除。
- 客户端串行执行: 串行同步执行的过程、千万不要因为一个 Watcher 而影响整个客户端回调 Watcher
- 轻量: WatchedEvent 是通知机制中最小的通知单元,只包含了三部分的内容: 通知状态、事件类型、节点路径。而不会将节点的内容以通知的方式告知客户端、而是需要客户端收到通知之后、主动去服务端获取数据。