深入了解 zookeeper 的 watcher 机制!

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 我们可以使用 zookeeper 作为注册中心来实现服务的注册与发现,curator 框架提供了 curator-x-discovery 扩展实现了开箱即用的服务注册发现,但更多时候我们还是选择自己去实现,那这个时候我们需要额外关注 zookeeper 的 1 个特性,即 wathcer。

我们可以使用 zookeeper 作为注册中心来实现服务的注册与发现,curator 框架提供了 curator-x-discovery 扩展实现了开箱即用的服务注册发现,但更多时候我们还是选择自己去实现,那这个时候我们需要额外关注 zookeeper 的 1 个特性,即 wathcer。


在微服务场景中,watcher 机制主要提供了服务通知功能,比如 Instance1 这个实例在 Service1 服务节点下注册了一个 emphemeral 子节点后,它的某个服务消费者根据依赖配置在 Service1 节点上注册了一个子节点 watcher,就如图中的红钥匙。


子节点类型的 watcher 会观测 Service1 的子节点,即 InstanceX 节点,但不会观测孙子节点 config1。那么当 Instance1 节点挂掉之后,watcher 可以做到通知给注册自身的那个服务消费者,通知完一次后 wacther 也就被销毁了。


image.png


wacther 原理框架

image.png


zookeeper 的 watcher 主要由 client、server 以及 watchManager 之间配合完成,包括 watcher 注册以及触发 2 个阶段。


在 client 端注册表为 ZkWatchManager,其中包括了 dataWatches、existWatches 以及 childWatches。在 server 端的注册表在 DataTree 类中,封装了 2 类 WatchManager,即 dataWatches 和 existWatches。dataWatches 代表当前节点的数据监听,childWathes 代表子节点监听,与 client 比少的 existWatches 也很容易理解,因为节点是否存在需要客户端去判断。


注册阶段客户端的 getData 和 exists 请求可以注册 dataWatches,getChilden 可以注册 childWatches。而触发阶段,setData 请求会触发当前节点 dataWatches,create 请求会触发当前节点 dataWatches 以及父节点的 childWatches,delete 请求则会触发当前节点、父节点、子节点的 dataWatches,以及父节点的 childWatches。


watchManager包含两个非常重要的数据结构:watchTable和watch2Paths。前者表示path-set ,后者表示watcher-set 。注意这里的watcher含义表示远程连接,所以watchTable表示一个目录下可能有多个消费者的监听连接,而watch2Paths表示一个消费者可能会对多个目录建立监听,显然多目录的监听会复用一个连接。


请求阶段的传输数据(包括 watcher 信息)会封装在 request 和 response 中,比如 getData 请求会封装 getDataRequest/getDataResponse。而触发阶段的 watcher 通知则通过事件 event 进行通信,server 端会发送一个 watcherEvent,而 client 端则会将其转换成 watchedEvent 再进行处理。


每个客户端都会维护 2 个线程,SendThread 负责处理客户端与服务端的请求通信,比如发送 getDataRequest,而 EventThread 则负责处理服务端的事件通知,即 watcher 的事件。


watcher 注册源码

我们来看看 watcher 注册的部分源码。首先是在客户端,以 Zookeeper 中 getData 方法为例,会入队一个 watch 为 true 的 packet。

public byte[] getData(final String path, Watcher watcher, Stat stat)
      throws KeeperException, InterruptedException {
    ...
        GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    ...
}

可以看到这边封装了 GetDataRequest 以及 GetDataResponse,而 request 中设置了 watch 参数为 true,最后将其进行 submitRequest,submitRequest 干的事儿其实就是将这些放入事件队列等待 sendThread 调度发送。


接着这个请求会被服务端所接收到,所有请求的服务端处理都在 FinalRequestProcessor#processRequest 方法中进行。

case OpCode.getData: {
    lastOp = "GETD";
    GetDataRequest getDataRequest = new GetDataRequest();
    ...
        byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                                               getDataRequest.getWatch() ? cnxn : null);
    ...
}

这边会通过一些 case 来判断请求类型,还是以 getData 为例,最终会调用到 DataTree 的 getData 方法,我们之前讲到 DataTree 里包含了 2 种 watcher,那这边除了获取数据外,自然是注册 dataWatchers 了。

 public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
     DataNode n = (DataNode)this.nodes.get(path);
     if (n == null) {
         throw new NoNodeException();
     } else {
         synchronized(n) {
             n.copyStat(stat);
             if (watcher != null) {
                 this.dataWatches.addWatch(path, watcher);
             }
             return n.data;
         }
     }
 }

addWatch 方法主要是将数据节点的路径以及 ServerCnxn(远程通信信息) 信息存储到 WatchManager 的 watchTable 和 watch2Paths 中。至此服务端已经接受到了 watcher 并注册到了 watchManager 中了。


我们将客户端自己也会保存一个 watchManager,这里其实是在接收到 getData 响应后进行的,在 ClientCnxn$SendThread 类的 readResponse->finishPacket 方法中。

private void finishPacket(ClientCnxn.Packet p) {
     if (p.watchRegistration != null) {
         p.watchRegistration.register(p.replyHeader.getErr());
     }
     if (p.cb == null) {
         synchronized(p) {
             p.finished = true;
             p.notifyAll();
         }
     } else {
         p.finished = true;
         this.eventThread.queuePacket(p);
     }
}

可以看到这边调用了 watchRegistration 的 register 方法,而它就是根据请求类型来装入对应的 watchManager 中了(dataWatches、existWatches、childWatches)。


整个大致的时序图可以参考下面:


image.png


watcher 触发源码

wathcer 触发部分,我们还以 服务端 DataTree 类处理 setData 请求 为例。

public Stat setData(String path, byte data[], int version, long zxid,
           long time) throws KeeperException.NoNodeException {
    ...
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

可以看到在处理完数据后调用了 triggerWatch,它干的事儿是从之前的 watchManager 中获得 watchers,然后一个个调用 process 方法。

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
                                      KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                                         ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                         "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

获取了需要本次触发的监听后,在 watchTable 和 watch2Paths 中还移除了自身,所以 watcher 是单次的。这里封装好了 watchedEvent 后塞入到了 Watcher的process 方法中,process 方法其实就是发送通知,以 Watcher的一个实现类NioServerCnxn 为例就是调用了其 sendResponse 方法将通知事件发送到客户端,发送前会将 watchedEvent 转换成 watcherEvent 进行发送。


那么客户端首先接收到请求的仍然是 ClientCnxn$sendThread 的 readResponse 方法,这里讲 watcherEvent 转换为 watchedEvent 后入列 eventThread 的事件队列 等待后续进行处理。

...
WatchedEvent we = new WatchedEvent(event);
if (ClientCnxn.LOG.isDebugEnabled()) {
    ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
}
ClientCnxn.this.eventThread.queueEvent(we);
...

我们直接看下 EventThread 的 run 方法吧,方法很简单,就是不断从 waitingEvents 事件队列中取通知事件。然后调用 processEvent 方法处理事件。

private void processEvent(Object event) {
    try {
        if (event instanceof WatcherSetEventPair) {
            // each watcher will process the event
            WatcherSetEventPair pair = (WatcherSetEventPair) event;
            for (Watcher watcher : pair.watchers) {
                try {
                    watcher.process(pair.event);
                } catch (Throwable t) {
                    LOG.error("Error while calling watcher ", t);
                }
            }
        } else {
            ...省略
        }

这里就是简单地取出本次事件需要通知的 watcher 集合,然后循环调用每个 watcher 的 process 方法了。那么在自己实现服务注册发现的场景里,显然 watcher 的 process 方法是我们自定义的啦。


整个 watcher 触发的时序图可以参考下面:


image.png


至此,zookeeper 的整个 watcher 交互逻辑就已经结束了。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
网络协议 中间件 数据库
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
147 0
|
1月前
|
存储 API
深入理解Zookeeper系列-4.Watcher原理
深入理解Zookeeper系列-4.Watcher原理
32 1
|
9月前
|
消息中间件 存储 分布式计算
消息队列kafka及zookeeper机制
消息队列kafka及zookeeper机制
157 1
|
11月前
|
算法
Zookeeper 的读写机制、保证机制、Watcher(数据变更的通知)
Zookeeper 的读写机制、保证机制、Watcher(数据变更的通知)
103 0
|
存储 设计模式 监控
Apache ZooKeeper - Watch 机制的底层原理
Apache ZooKeeper - Watch 机制的底层原理
90 0
|
Apache
Apache ZooKeeper - 事件监听机制详解
Apache ZooKeeper - 事件监听机制详解
80 0
|
Go 数据安全/隐私保护 微服务
48-微服务技术栈(高级):分布式协调服务zookeeper源码篇(Watcher机制-3[Zookeeper])
  前面已经分析了Watcher机制中的大多数类,本篇对于ZKWatchManager的外部类Zookeeper进行分析。
129 0
|
缓存 安全 微服务
47-微服务技术栈(高级):分布式协调服务zookeeper源码篇(Watcher机制-2[WatchManager])
前面已经分析了Watcher机制中的第一部分,即在org.apache.zookeeper下的相关类,接着来分析org.apache.zookeeper.server下的WatchManager类。
128 0
|
微服务
46-微服务技术栈(高级):分布式协调服务zookeeper源码篇(Watcher机制-1)
  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。
72 0
|
NoSQL Java API
一文带你理解Zookeeper实现分布式锁的机制
一文带你理解Zookeeper实现分布式锁的机制
306 0
一文带你理解Zookeeper实现分布式锁的机制