Zookeeper watcher
在api中,如果我们的客户端需要去实现watcher,就想zk做注册中心,配置中心的情况下,我们都需要实现在zk server上的配置变更和服务地址变更的通知 要去告诉我们的客户端,所有的客户端,你的数据发生了变化你需要采取一些行动,其实这就是一个通知的机制。
// standrd 标准监听 (一次性监听) ZooKeeper zooKeeper=new ZooKeeper("", 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //表示连接成功之后,会产生的回调时间 } }); Stat stat=new Stat(); zooKeeper.getData("/first", new DataWatchListener(),stat); //针对当前节点 class DataWatchListener implements Watcher{ @Override public void process(WatchedEvent watchedEvent) { String path=watchedEvent.getPath(); //再次注册监听 try { zooKeeper.getData(path,this,new Stat()); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } // 在3.6.1 还有着新的监听 持久化监听 和 持久化递归监听 // 持久化监听:只需要注册一次事件 // 持久化递归监听:其子节点发生变化,都会触发监听 // 默认情况下,是递归持久化监听 ZooKeeper.addWatch("path",new DataWatchListener(),Add.WatchMode.PERSISTENT_RECURSIVE)
首先客户端要发起一个请求,客户端所有的请求先发到阻塞队列中,然后一个 SendThread线程 去轮询队列,通过take的方式,可以发现异步的方式能够很大程度上的提升整个的处理性能,发送过来的任务是一个request,它里面可以包括 crud exist等,我们后续的请求实际上就是NIO,实际上会将请求转换成序列化以后,自己会实现一个序列化机制,然后将这个请求发送到服务端,这个请求要做的就是 注册 watcher 带的内容是 path/watch:true,发送到服务端之后,因为客户端和服务端建立好连接以后,会维持这个会话,所以最终服务端会保存这个watcher,将其存储在HashMap中。
之所以是HashMap<String,HashSet> 是因为可能有多个客户端。
然后客户端这边也需要存储一个事件的管理 提供了一个类 ZkWatcherManager,通过一些集合去存储客户端那边锁对应的watcher
zookeeper.exists(); --------------------------------------------------------------------------------------- public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); //节点校验 // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new ExistsWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); //请求头 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.exists); //请求对象 ExistsRequest request = new ExistsRequest(); request.setPath(serverPath); //first request.setWatch(watcher != null); //true/false //response返回对象 SetDataResponse response = new SetDataResponse(); // cnxn 网络通信的负责处理的类 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) {//返回的错误码去判断返回的结果 if (r.getErr() == KeeperException.Code.NONODE.intValue()) { return null; } throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } //返回stata return response.getStat().getCzxid() == -1 ? null : response.getStat(); } --------------------------------------------------------------------------------------- public ReplyHeader submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); //构建Packet 数据包,主要是需要传递的内容(queuePacket 实际上在这里是要将一个数据包发送到队列中,这符合我么图解中说明的那样) Packet packet = queuePacket( h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) {// 加锁 if (requestTimeout > 0) { //如果携带了请求超时时间. 带超时时间的等待 // Wait for request completion with timeout waitForPacketFinish(r, packet); } else { // Wait for request completion infinitely while (!packet.finished) { //只要packet没有处理完成,那么一直调用wait等待。 packet.wait(); //阻塞 } } } if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) { sendThread.cleanAndNotifyState(); } return r; } --------------------------------------------------------------------------------------- public Packet queuePacket( RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; // The synchronized block here is for two purpose: // 1. synchronize with the final cleanup() in SendThread.run() to avoid race // 2. synchronized against each packet. So if a closeSession packet is added, // later packet will be notified. synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } //添加到阻塞队列 outgoingQueue.add(packet); } } // 唤醒处于阻塞在selector.select上的线程 // 那么sendThread是在哪里初始化的呢? sendThread.getClientCnxnSocket().packetAdded(); return packet; } --------------------------------------------------------------------------------------- //在zookeeper的构造方法里面 public ZooKeeper( String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider) throws IOException { this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, null); } public ZooKeeper( ......... // 客户端和服务端的一个连接 cnxn = createConnection( connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } public void start() { //发送线程 sendThread.start(); //事件线程(触发事件的线程, // 也就是说当服务端触发了事件通知到客户端之后,客户端需要从本地的事件列表中去读取watcher,并且进行回调) eventThread.start(); } // 后续就是一堆nio netty等流程的内容,暂不关注 --------------------------------------------------------------------------------------- public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); //反序列化header cnxn.incrOutstandingAndCheckThrottle(h); incomingBuffer = incomingBuffer.slice(); //根据请求类型进行不同的处理 if (h.getType() == OpCode.auth) { // 授权 } else if (h.getType() == OpCode.sasl) { processSasl(incomingBuffer, cnxn, h); } else { if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue()); cnxn.sendResponse(replyHeader, null, "response"); cnxn.sendCloseSession(); cnxn.disableRecv(); } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); int length = incomingBuffer.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); submitRequest(si); //提交请求(异步有关系) } } } public void submitRequest(Request si) { enqueueRequest(si); } public void enqueueRequest(Request si) { // 有点类似限流的逻辑 if (requestThrottler == null) { synchronized (this) { try { while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (requestThrottler == null) { throw new RuntimeException("Not started"); } } } requestThrottler.submitRequest(si); } public void submitRequest(Request request) { if (stopping) {//如果服务端在终止的过程,则删除这个请求 LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { submittedRequests.add(request); } } --------------------------------------------------------------------------------------- // 此时终于找到了一直轮询的线程了 public void run() { try { while (true) { if (killed) { break; } // 到这里 一个典型的生产者消费者的方式才很清晰 Request request = submittedRequests.take(); if (Request.requestOfDeath == request) { break; } if (request.mustDrop()) { continue; } // Throttling is disabled when maxRequests = 0 //节流阀是否处于关闭状态,=0表示关闭 if (maxRequests > 0) { while (!killed) { if (dropStaleRequests && request.isStale()) { // Note: this will close the connection dropRequest(request); ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1); request = null; break; } //限流动作 if (zks.getInProcess() < maxRequests) { break; } //等待. throttleSleep(stallTime); } } if (killed) { break; } // A dropped stale request will be null if (request != null) { if (request.isStale()) { ServerMetrics.getMetrics().STALE_REQUESTS.add(1); } zks.submitRequestNow(request); } } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } int dropped = drainQueue(); LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped); } public void submitRequestNow(Request si) { if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { //如果packet合法 setLocalSessionFlag(si); //通过一个处理器链来处理这个请求 // PrepRequestProcessor(SyncRequestProcessor(FinalRequestProcessor)) // firstProcessor.processRequest(si); --------------------------------------------------------------------------------------- //构建一个请求处理链路 //单机环境的处理链路:PrepRequestProcessor(SyncRequestProcessor(FinalRequestProcessor)) protected void setupRequestProcessors() { //最终的处理器 RequestProcessor finalProcessor = new FinalRequestProcessor(this); //SyncRequestProcessor 同步处理器 将数据同步到本地磁盘 RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); // 同步处理器最终会有一个 写入到快照文件,也就是需要设置自己的磁盘同步策略 // 其实就是性能 和 一致性的取舍问题 ((SyncRequestProcessor) syncProcessor).start(); //PrepRequestProcessor 预处理器 firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor) firstProcessor).start(); } --------------------------------------------------------------------------------------- if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type {}", si.type); // Update request accounting/throttling limits requestFinished(si); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { LOG.debug("Dropping request.", e); // Update request accounting/throttling limits requestFinished(si); } catch (RequestProcessorException e) { LOG.error("Unable to process request", e); // Update request accounting/throttling limits requestFinished(si); } } --------------------------------------------------------------------------------------- case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } //通过zk得到stat // 从这里可以看出来前面图解中说到的HashMap 里面的 Set<Watcher> 中存储的watcher其实是网络对象 // 之所以这样去实现当path发生变化的时候,需要告诉所有的监视者,记住这个网络连接将数据返回出去就行了。 Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); requestPathMetricsCollector.registerRequest(request.type, path); break; } public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException { Stat stat = new Stat(); DataNode n = nodes.get(path); // 到这里才到图解中注册的流程 if (watcher != null) {//服务端的注册的流程 dataWatches.addWatch(path, watcher); } if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); } updateReadStat(path, 0L); return stat; } public boolean addWatch(String path, Watcher watcher) { /** * watcher 表示当前的一个注册监听的一个连接 * path 表示监听的路径 */ return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE); } STANDARD(false, false), PERSISTENT(true, false), PERSISTENT_RECURSIVE(true, true) ; public static final WatcherMode DEFAULT_WATCHER_MODE = WatcherMode.STANDARD; // 接下来就是自然而然的保存了 // 表示节点到watcher集合的映射 private final Map<String, Set<Watcher>> watchTable = new HashMap<>(); // 表示从watcher到所有节点的映射 private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>(); public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) { if (isDeadWatcher(watcher)) { LOG.debug("Ignoring addWatch with closed cnxn"); return false; } Set<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<>(4); // 保存 watchTable.put(path, list); } list.add(watcher); Set<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<>(); watch2Paths.put(watcher, paths); } //设置监听模式 watcherModeManager.setWatcherMode(watcher, path, watcherMode); return paths.add(path); } // 此时就完成了整个服务端的一个保存 --------------------------------------------------------------------------------------- // 在SendThread中 有一个叫readResponse 的方法 void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); switch (replyHdr.getXid()) { case PING_XID: ...... case AUTHPACKET_XID: ...... case NOTIFICATION_XID: ...... default: break; } // If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. if (tunnelAuthInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia, "token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet); } packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet); } finally { finishPacket(packet); } } 在zookeeper.class中 public void register(int rc) { if (shouldAddWatch(rc)) {//如果服务端已经建立了映射关系,则需要在客户端建立好关系 Map<String, Set<Watcher>> watches = getWatches(rc); synchronized (watches) { Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet<Watcher>(); watches.put(clientPath, watchers); } watchers.add(watcher); } } } /* 后续的流程,如果服务端返回成功的话,那么就保存好了,此时关系就建立好了,而触发监听的方式。 服务端在一个地方发现数据发生变更的时候,直接在服务端找到一个对应的watcher,去推送消息就行了,客户端收到消息,判断消息类型,根据映射关系去找到watcher回调即可。 */