Zookeeper Watcher 流程分析(结合源码)

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: ZK提供了分布式数据的发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态发生变化时,会通知所有的订阅者。在ZK中引入了 Watcher 机制来实现这种分布式的通知功能。

概述


ZK提供了分布式数据的发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态发生变化时,会通知所有的订阅者。在ZK中引入了 Watcher 机制来实现这种分布式的通知功能

ZK允许客户端向服务器端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher ,那么就会向指定客户端发送一个事件通知来实现分布式通知功能。

大致流程就是 Client 向ZK中注册 Watcher,如果注册成功的话,会将对应的 Watcher 存储在本地。当ZK服务器端触发 Watcher 事件之后,会向客户端发送通知,客户端会从 ClientWatchManager 中取出对应的 Watcher 进行回调。


Watcher 接口


说了那么久、Watcher 究竟是啥?有什么用处?

/**
 * This interface specifies the public interface an event handler class must
 * implement. A ZooKeeper client will get various events from the ZooKeeper
 * server it connects to. An application using such a client handles these
 * events by registering a callback object with the client. The callback object
 * is expected to be an instance of a class that implements Watcher interface.
 */
@InterfaceAudience.Public
public interface Watcher {
    void process(WatchedEvent event);
}
复制代码

只要你通过这个接口的实现类对象去向ZK服务端注册监听,那么当有ZK服务端有事件通知到Client,那么就会回调这个 process 方法。


WatchedEvent

那么 WatchedEvent 又有什么玄机呢?

public class WatchedEvent {
   /**
    * Enumeration of states the ZooKeeper may be at the event
    */
    private final KeeperState keeperState;
   /**
    * Enumeration of types of events that may occur on the ZooKeeper
    */
    private final EventType eventType;
    private String path;
}
复制代码

KeeperStateEventType 是两个枚举类,分别代表通知状态和事件类型。path 就是 client 监听到路径。


常见的 KeeperStateEventType 组合

KeeperState EventType 触发条件 说明
SyncConnected None(-1) 客户端与服务端成功建立会话 客户端和服务端处于连接状态
SyncConnected NodeCreated(1) Watcher 监听对应的数据节点被创建 客户端和服务端处于连接状态
SyncConnected NodeDeleted(2) Watcher 监听对应的数据节点被删除 客户端和服务端处于连接状态
SyncConnected NodeDataChanged(3) Watcher 监听对应的数据节点的内容发生变更(数据内容和数据版本号) 客户端和服务端处于连接状态
SyncConnected NodeChildrenChanged(4) Watcher 监听对应的数据节点的子节点列表发生改变 客户端和服务端处于连接状态

关于 NodeDataChanged 事件类型,这里的变更包括节点的数据内容发生变更,也包括数据的版本号(dataVersion) 变更,所以只要有客户端调用了数据更新接口,不管数据内容是否发生改变、都会导致 dataVersion 发生改变,从而触发对应 Watcher 的监听。这样子就能避免典型乐观锁 ABA 的问题。


WatcherEvent

我们可以在 WatchedEvent 中发现有这么一个方法

/**
     *  Convert WatchedEvent to type that can be sent over network
     */
    public WatcherEvent getWrapper() {
        return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
    }
复制代码

笼统的说,WatcherEventWatchedEvent 表示的是同一个事物,都是对服务端事件的封装。WatchedEvent 是一个用于逻辑处理的对象、而WatcherEvent 是用于传输的实体对象。从上面的代码我们可以看到,创建 WatcherEvent 的参数就是 WatchedEvent 中各个属性的值。

people.apache.org/~larsgeorge… 中可以看到它实现了 Record 接口

public class WatcherEvent
extends Object
implements org.apache.jute.Record
复制代码


而在 Record 接口中定义了序列化和反序列的方法

@InterfaceAudience.Public
public interface Record {
    void serialize(OutputArchive archive, String tag) throws IOException;
    void deserialize(InputArchive archive, String tag) throws IOException;
}
复制代码


相关组件


相关过程

概括可以分为三个过程

  • 客户端注册 Watcher
  • 服务端处理 Watcher
  • 客户端回调 Watcher


客户端注册 Watcher

我们在创建一个ZK 客户端实例对象的时候、可以向构造方法中传入一个默认的 Watcher

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) 
复制代码

参数中的这个 Watcher 将会被保存在 ZKWatchManager 中,作为整个会话期间的默认的 Watcher

watchManager.defaultWatcher = watcher;
复制代码

除此之外、ZK 客户端也可以通过 getData,getChildren,exist 三个接口向ZK服务端注册 Watcher


我们以 getData 接口来分析

public byte[] getData(final String path, Watcher watcher, Stat stat){
  .....
}
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
        return getData(path, getDefaultWatcher(watch), stat);
}
复制代码


如果我们的参数 watch 为 true , 那么 getDefaultWatcher 就是去拿我们创建Zookeeper 时传入的默认的 Watcher

private Watcher getDefaultWatcher(boolean required) {
        if (required) {
            if (watchManager.defaultWatcher != null) {
                return watchManager.defaultWatcher;
            } else {
                throw new IllegalStateException("Default watcher is required, but it is null.");
            }
        }
        return null;
    }
复制代码


下面是 完整的 getData 代码

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);
        // the watch contains the un-chroot path
        // 创建 数据类型  的 watch registration
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
        // 将客户端change root directory 的路径加上、变回服务端那边正常的路径
        final String serverPath = prependChroot(clientPath);
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        // 标记是否有 watcher
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
复制代码
  1. 创建一个 DataWatchRegistration
  2. 转换 path (客户端这边可能 change root directory,发送请求前要将其转为为服务端那边的路径)
  3. 使用 ClientCnxn 提交这个请求
public ReplyHeader submitRequest(
        RequestHeader h,
        Record request,
        Record response,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(
            h,
            r,
            request,
            response,
            null,
            null,
            null,
            null,
            watchRegistration,
            watchDeregistration);
        ....
        ....
        return r;
    }
复制代码


最终这个 Request 被加入到 outgoingQueue中

public Packet queuePacket(
        RequestHeader h,
        ReplyHeader r,
        Record request,
        Record response,
        AsyncCallback cb,
        String clientPath,
        String serverPath,
        Object ctx,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) {
        Packet packet = null;
        packet = new Packet(h, r, request, response, watchRegistration);
        synchronized (state) {
        ...
              ....
                outgoingQueue.add(packet);
            }
        }
复制代码


最终发送请求到服务端,在 SendThread#readResponse 中处理返回结果

void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            switch (replyHdr.getXid()) {
            case PING_XID:
               ....
               ....
                return;
              case AUTHPACKET_XID:
                ...
                ...
              return;
                // 处理服务端到通知
            case NOTIFICATION_XID:
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > chrootPath.length()) {
                        event.setPath(serverPath.substring(chrootPath.length()));
                     } else {
                         LOG.warn("Got server path {} which is too short for chroot path {}.",
                             event.getPath(), chrootPath);
                     }
                }
                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                // 加入到事件队列中、由EventThread处理
                eventThread.queueEvent(we);
                return;
            default:
                break;
            }
           // 移除这个Pacjet
            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                }
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                ....
                .....
            } finally {
               // 将Watcher 保存在 ClientWatchManager
                finishPacket(packet);
            }
        }
复制代码


主要做了啥事情

  1. 反序列化,获取请求头中的 XID 判断是否是服务端到通知、如果是的话、加入到事件队列中、由EventThread去处理
  2. 从 outgoingQueue中移除 Packet。
  3. 调用 finishPacket 函数、进行一些后续处理
protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        ...
        ...   
 }
复制代码

最后回到 WatchRegistration 将对应的 Watcher 注册到对应的 Map<String, Set<Watcher>> 中。


服务端处理 Watcher

先来认识几个主要的组件类

WatchManager 是 ZK 服务端 Watcher 的管理者,其内部管理的 watchTablewatch2Paths 两个存储结构,分别用两个维度对 Watcher 进行存储。

  • watchTable 从数据节点路径的粒度来托管 Watcher。
  • watch2Paths 从 Watcher 的粒度来控制事件触发需要触发的数据节点。

ServerCnxn 是一个 Zookeeper 客户端和负担之间的连接接口、代表了一个客户端和服务端的连接,其默认实现是 NIOServerCnxn ,从 3.4.0 开始引入了基于Netty 的实现 NettyServerCnxn

ServerCnxn 同时实现了 Watcher 接口,因此我们可以将其看作是一个 Watcher 对象.

数据节点的路径和 ServerCnxn 都会被存储在 WatchManager


服务端收到客户端的请求后会在 FinalRequestProcessor#processRequest 中判断当前请求是否需要注册 Watcher。

case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
                path = getDataRequest.getPath();
         // 调用处理 getData 请求的方法
                rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
                requestPathMetricsCollector.registerRequest(request.type, path);
                break;
            }
复制代码
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
       ....
        ....
        // 这注意、客户端是否需要注册 Watcher、请求中只是有一个 boolean 字段来表示
        // 从请求中获取是否需要注册 Watcher
        byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
        return new GetDataResponse(b, stat);
    }
复制代码
public byte[] getData(String path, Stat stat, Watcher watcher)  {
        return dataTree.getData(path, stat, watcher);
    }
public byte[] getData(String path, Stat stat, Watcher watcher)  {
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
              // 这里的 dataWatches 就是 IWatchManager 接口对应的实例
                dataWatches.addWatch(path, watcher);
            }
            data = n.data;
        }
        updateReadStat(path, data == null ? 0 : data.length);
        return data;
    }
复制代码


最终会被放置到 watchTablewatch2Paths 中存储

@Override
    public boolean addWatch(String path, Watcher watcher) {
        return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE);
    }
    @Override
    public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
        if (isDeadWatcher(watcher)) {
            return false;
        }
    // 从中拿出 Set
        Set<Watcher> list = watchTable.get(path);
        if (list == null) {
            list = new HashSet<>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);
    // 
        Set<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            paths = new HashSet<>();
            watch2Paths.put(watcher, paths);
        }
        watcherModeManager.setWatcherMode(watcher, path, watcherMode);
        return paths.add(path);
    }
复制代码


Watcher 的触发

NodeDataChange 的触发是我们节点的数据内容或者节点的 dataVersion 发生改变。

那么我们可以来看看 org.apache.zookeeper.server.DataTree#setData 方法

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte[] lastdata = null;
        synchronized (n) {
            lastdata = n.data;
            nodes.preChange(path, n);
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
            nodes.postChange(path, n);
        }
    ....
        ....
        updateWriteStat(path, dataBytes);
     // 调用IWatchManager 的方法
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
复制代码
@Override
    public WatcherOrBitSet triggerWatch(String path, EventType type) {
        return triggerWatch(path, type, null);
    }
    @Override
    public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
      // 封装成 WatchedEvent 
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        Set<Watcher> watchers = new HashSet<>();
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        synchronized (this) {
            for (String localPath : pathParentIterator.asIterable()) {
                Set<Watcher> thisWatchers = watchTable.get(localPath);
              // 无监听
                if (thisWatchers == null || thisWatchers.isEmpty()) {
                    continue;
                }
                Iterator<Watcher> iterator = thisWatchers.iterator();
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
                    if (watcherMode.isRecursive()) {
                    } else if (!pathParentIterator.atParentPath()) {
                        watchers.add(watcher);
                        if (!watcherMode.isPersistent()) {
                          // 移除掉
                            iterator.remove();
                            Set<String> paths = watch2Paths.get(watcher);
                            if (paths != null) {
                              // 从 watch2Paths 中移除掉
                                paths.remove(localPath);
                            }
                        }
                    }
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
          // 调用 process 方法
            w.process(e);
        }
    .....
        .....
        return new WatcherOrBitSet(watchers);
    }
复制代码


上面已经提及、ServerCnxn 实现了 Watcher 接口,我们看看 org.apache.zookeeper.server.NIOServerCnxn#process

@Override
    public void process(WatchedEvent event) {
      // 请求头中的 XID 设置为 -1,上面分析 SendThread.readResponse 的时候提及过
        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
        // WatchedEvent 变为 WatcherEvent
        WatcherEvent e = event.getWrapper();
    // 给客户端发送通知
        sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
    }
复制代码


基本流程

  • 封装 WatchedEvent
  • watchTable 中找到对应的 Watcher,并将 watchTablewatch2Paths 中相关的 Watcher 和路径清除掉(只能触发一次喔)
  • 调用 process 方法。


客户端回调 Watcher

我们先来认识下 EventThread 这个类


继承自 Thread ,使用 LinkedBlockingQueue<Object> waitingEvents 保存将要处理的事件,然后 ```run`` 方法不断的从队列中获取进行处理。

我们已经知道客户端中由 SendThread#readResponse 处理(这段代码也出现在上面的客户端注册 Watcher 的时候)

case NOTIFICATION_XID:
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > chrootPath.length()) {
                        event.setPath(serverPath.substring(chrootPath.length()));
                     } else {
                         LOG.warn("Got server path {} which is too short for chroot path {}.",
                             event.getPath(), chrootPath);
                     }
                }
                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                // 加入到事件队列中、由EventThread处理
                eventThread.queueEvent(we);
                return;
复制代码


加入到 ```waitingEvents`` 队列中

public void queueEvent(WatchedEvent event) {
            queueEvent(event, null);
        }
        private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // 从 clientWatchManager 中获取对应的 Watcher,也会从对应的 Map中移除 Watcher
              // 一样是一次性的
                watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<Watcher>();
                watchers.addAll(materializedWatchers);
            }
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // 加入到 waitingEvents 中、等待 run 方法 拿出来处理
            waitingEvents.add(pair);
        }
复制代码


run 方法

public void run() {
            try {
                isRunning = true;
                while (true) {
                    Object event = waitingEvents.take();
                    if (event == eventOfDeath) {
                        wasKilled = true;
                    } else {
                        processEvent(event);
                    }
                  ......
                  ......
                }}
        }
        private void processEvent(Object event) {
            try {
                if (event instanceof WatcherSetEventPair) {
                    // each watcher will process the event
                    WatcherSetEventPair pair = (WatcherSetEventPair) event;
                    for (Watcher watcher : pair.watchers) {
                        try {
                          // 调用 process 方法,串行同步处理
                            watcher.process(pair.event);
                        } catch (Throwable t) {
                            LOG.error("Error while calling watcher.", t);
                        }
                    }
                } }
          .......
          .......
    }
复制代码


总结


Watcher 的特性

  • 一次性:无论是客户端还是服务端、一旦 Watcher 触发、都会将其从存储中移除。
  • 客户端串行执行: 串行同步执行的过程、千万不要因为一个 Watcher 而影响整个客户端回调 Watcher
  • 轻量: WatchedEvent 是通知机制中最小的通知单元,只包含了三部分的内容: 通知状态、事件类型、节点路径。而不会将节点的内容以通知的方式告知客户端、而是需要客户端收到通知之后、主动去服务端获取数据。



相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
5月前
|
存储 负载均衡 算法
深入浅出Zookeeper源码(七):Leader选举
对于一个分布式集群来说,保证数据写入一致性最简单的方式就是依靠一个节点来调度和管理其他节点。在分布式系统中我们一般称其为Leader。
193 6
|
3月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
2月前
|
监控 API
【zookeeper 第四篇章】监控 Watcher
ZooKeeper通过Watcher机制实现了数据的发布/订阅功能。多个订阅者可以监听同一主题对象,一旦该对象状态变化,如节点内容或子节点列表变动,ZooKeeper会实时通知所有订阅者。Watcher架构包括ZooKeeper服务端、客户端及其Watcher管理器。客户端向服务端注册Watcher并保存至本地管理器中;当状态变化时,服务端通知客户端,触发相关Watcher回调处理逻辑。
40 2
|
3月前
|
API
【想进大厂还不会阅读源码】ShenYu源码-替换ZooKeeper客户端
ShenYu源码阅读。相信大家碰到源码时经常无从下手,不知道从哪开始阅读😭。我认为有一种办法可以解决大家的困扰!至此,我们发现自己开始从大量堆砌的源码中脱离开来😀。ShenYu是一个异步的,高性能的,跨语言的,响应式的 API 网关。
|
5月前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
5月前
|
存储 设计模式 算法
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。
159 1
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
|
5月前
|
存储 API
深入理解Zookeeper系列-4.Watcher原理
深入理解Zookeeper系列-4.Watcher原理
50 1
|
5月前
|
Apache
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
117 2
|
5月前
|
网络协议 数据库
深入浅出Zookeeper源码(五):BadVersionException到底是怎么一回事
最近在开发时偶尔会观测到zk报出`BadVersionException`,后在搜索引起上得知了是乐观锁相关的问题,很快就解决了问题。不过学而不思则罔:无论是单体应用还是分布式系统,在运行过程中总要有一种**机制**来保证数据排他性。接下来,我们就来看看zk是如何实现这种**机制**的。
123 1
|
5月前
|
网络协议 数据库
深入浅出Zookeeper源码(三):会话管理
我们知道zookeeper是一个分布式协同系统。在一个大型的分布式系统中,必然会有大量的client来连接zookeeper。那么zookeeper是如何管理这些session的生命周期呢?带着这个问题,我们进入今天的正文。
117 1
下一篇
无影云桌面