深入理解 ZooKeeper客户端与服务端的watcher回调(二)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 深入理解 ZooKeeper客户端与服务端的watcher回调(二)

重要:发送事务消息#


FinalRequestProcessorpublic void processRequest(Request request) {}方法中,有如下代码


//todo 请求头不为空
    if (request.hdr != null) {
        // 获取请求头
       TxnHeader hdr = request.hdr;
       // 获取事务
       Record txn = request.txn;
        // todo 跟进这个方法-----<--!!!!!!-----处理事务的逻辑,在这里面有向客户端发送事件的逻辑, 回调客户端的watcher----!!!!!!-->
       rc = zks.processTxn(hdr, txn);
    }


继续跟进去


// todo 处理事物日志
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    // todo 继续跟进去!!!!!!!!!
    // todo 跟进 processTxn(hdr, txn)
    rc = getZKDatabase().processTxn(hdr, txn);


跟进ZkDatabase.java中的processTxn(hdr, txn)方法


public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    // todo 跟进 processTxn
    return dataTree.processTxn(hdr, txn);
}


跟进到DataTree.java


public ProcessTxnResult processTxn(TxnHeader header, Record txn)
    {
        ProcessTxnResult rc = new ProcessTxnResult();
        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) { // todo 根据客客户端发送过来的type进行switch,
                case OpCode.create:
                    CreateTxn createTxn = (CreateTxn) txn;
                    rc.path = createTxn.getPath();
                    // todo  跟进这个创建节点的方法
                    createNode(
                            createTxn.getPath(),


根据请求头的值,进而判断出走到那个switch的分支,当前我们在控制台触发,进入到setData分支如下:跟进这个方法中可以看到它主要做了如下几件事

  • 使用传递进来的新值替代旧data
  • dataWatches.triggerWatch(path, EventType.NodeDataChanged);**触发指定的事件watch,什么事件呢? NodeDataChange, 触发了哪个watcher呢? 跟进去查看 **


//todo  setData
    public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
        synchronized (n) {
            // todo 修改内存的数据
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
        // todo 终于 看到了   服务端 关于触发NodeDataChanged的事件
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }


补充Watch & EventType 类图#


网络异常,图片无法展示
|


跟进去dataWatches.triggerWatch(path, EventType.NodeDataChanged);,源码如下, 主要的逻辑就是取出存放在服务端的watch,然后逐个回调他们的processor函数,问题来了,到底是哪些watcher呢? 其实就是我们在客户端启动时添加getData()时存进去的wather,也就是ServerCnxn


// todo 跟进去服务端的 触发事件,  但是吧, 很纳闷. 就是没有往客户端发送数据的逻辑
    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            // todo 继续跟进去, 看它如何回调的
            w.process(e);
        }
        return watchers;
    }


怀着激动的心情去看看ServerCnxn的process()方法做了什么事?


来到ServerCnxn的实现类NIOServerCnxn, 确实很激动,看到了服务端在往客户端发送事务型消息, 并且new ReplyHeader(-1, -1L, 0)第一个位置上的参数是-1, 这一点很重要,因为客户端在接受到这个xid=-1的标记后,就会将这条响应交给EventThread处理


@Override
    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }
        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        // todo  往服务端发送了 e event类型消息
        sendResponse(h, e, "notification");
    }


处理回调回调watch使用的响应#


进入到SendThread的读就绪源码部分,如下: 它根据header.xid=-1就知道了这是事务类型的响应


// todo 服务端抛出来的事件, 客户端将把他存在EventThread的 watingEvents 队列中
// todo 它的实现逻辑也是这样, 会有另外一个线程不断的消费这个队列
if (replyHdr.getXid() == -1) {
    // -1 means notification
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got notification sessionid:0x"
                + Long.toHexString(sessionId));
    }
    // todo 创建watcherEvent 并将服务端发送回来的数据,反序列化进这个对象中
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");
    // convert from a server path to a client path
    // todo 将server path 反转成 client path
    if (chrootPath != null) {
        String serverPath = event.getPath();
        if (serverPath.compareTo(chrootPath) == 0)
            event.setPath("/");
        else if (serverPath.length() > chrootPath.length())
            event.setPath(serverPath.substring(chrootPath.length()));
        else {
            LOG.warn("Got server path " + event.getPath()
                    + " which is too short for chroot path "
                    + chrootPath);
        }
              WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                //todo 跟进去
                eventThread.queueEvent(we);
                return;
            }
    }


在这个方法的最后,将这个相应添加进EventThread消费的队列中,跟进 eventThread.queueEvent(we);


// todo
public void queueEvent(WatchedEvent event) {
    // todo 如果事件的类型是 none, 或者sessionState =  直接返回
    /**
     *   todo 事件的类型被设计成 watcher 接口的枚举
     *   None (-1),
     *   NodeCreated (1),
     *   NodeDeleted (2),
     *   NodeDataChanged (3),
     *   NodeChildrenChanged (4);
     */
    if (event.getType() == EventType.None
            && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();
    // materialize the watchers based on the event
    // todo 根据事件的具体类型,将观察者具体化, 跟进去
    // todo 这个类是ClientCnxn的辅助类,作用就是将watcher 和它观察的事件封装在一起
    WatcherSetEventPair pair = new WatcherSetEventPair(
            //todo 跟进这个 materialize方法. 其实就是从map中取出了和当前client关联的全部 watcher set
            watcher.materialize(event.getState(), event.getType(),
                    event.getPath()),
            event);
    // queue the pair (watch set & event) for later processing
    // todo 将watch集合 和 event 进行排队(按顺序添加到队列里了), 以便后续处理 , 怎么处理呢?  就在EventThread的run循环中消费
    // todo watingEvent ==>  LinkedBlockingQueue<Object>
    waitingEvents.add(pair);
}


上面的代码主要做了如下几件事:

  • 从map中取出和当前事件相关的全部watcher
  • 将watcher set 添加进 waitingEvents队列中,等待EventThead的消费

跟进 watcher.materialize(event.getState(), event.getType(), 会追到下面的代码


case NodeDataChanged: // todo node中的data改变和 nodeCreate 都会来到下面的分支
        case NodeCreated:
            synchronized (dataWatches) {
                // todo dataWatches 就是刚才存放  path : watcher 的map
                // todo dataWatches.remove(clientPath) 移除并返回clientPath对应的watcher , 放入 result 中
                addTo(dataWatches.remove(clientPath), result);
            }


上面的dataWatches 就是保存path+watcher set的map, 上面的操作是移除并返回指定的watcher,这也说明了,为什么zk原生客户端添加的watcher仅仅会回调一次


EventThread是如何消费waitingEvents的#


EventThread是一条守护线程, 因此它拥有自己的不断在运行的run方法,它就是在这个run方法中对这个队列进行消费的


@Override
        public void run() {
            try {
                isRunning = true;
                // todo 同样是无限的循环
                while (true) {
                    // todo 从watingEvnets 中取出一个 WatcherSetEventPair
                    Object event = waitingEvents.take();
                    if (event == eventOfDeath) {
                        wasKilled = true;
                    } else {
                        // todo 本类方法,处理这个事件,继续进入,方法就在下面
                        processEvent(event);
                    }
                    if (wasKilled)
                        synchronized (waitingEvents) {
                            if (waitingEvents.isEmpty()) {
                                isRunning = false;
                                break;
                            }
                        }
                }
            } catc


继续跟进它的processEvent(event),最终会在这个方法中调用下面的代码,这里的watcher就是我在本篇博客的开始位置添加进去的watcher,至此打完收工


watcher.process(pair.event);


总结:#


当客户端启动时添加watcher对某一个特定path上的node进行监听时 , 客户端的watcher被封装进WatcherRegistion中再进一步发送的服务端

watcher不为空的packet达到服务端后会被巧妙的处理,将ServerCnxn当成watcher注册添加到服务端维护的那份watcher map table中

当watcher关联的node发生了NodeCreate,NodeDeleted ,NodeDataChannged,NodeChildrenChannged时,在最后一个处理器就会触发发送事务类型事件的动作,其实就是回调ServerCnxn的process()方法

事务类型的响应返回到客户端,跟进xid区分出到底是哪种响应,如-1是NodeDataChanged,最终会把这个事务事件提交到EventThread消费的waitingEvents等待EventThread消费它,回调客户端的watcher的process()方法

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
3月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
33 0
|
4月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
1月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
61 1
|
1月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
39 1
|
1月前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
41 1
|
2月前
|
负载均衡 API 数据安全/隐私保护
Zookeeper的客户端-原生的API
Zookeeper的客户端-原生的API
|
3月前
|
监控 API
【zookeeper 第四篇章】监控 Watcher
ZooKeeper通过Watcher机制实现了数据的发布/订阅功能。多个订阅者可以监听同一主题对象,一旦该对象状态变化,如节点内容或子节点列表变动,ZooKeeper会实时通知所有订阅者。Watcher架构包括ZooKeeper服务端、客户端及其Watcher管理器。客户端向服务端注册Watcher并保存至本地管理器中;当状态变化时,服务端通知客户端,触发相关Watcher回调处理逻辑。
70 2
|
4月前
|
API
【想进大厂还不会阅读源码】ShenYu源码-替换ZooKeeper客户端
ShenYu源码阅读。相信大家碰到源码时经常无从下手,不知道从哪开始阅读😭。我认为有一种办法可以解决大家的困扰!至此,我们发现自己开始从大量堆砌的源码中脱离开来😀。ShenYu是一个异步的,高性能的,跨语言的,响应式的 API 网关。
|
6月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
74 11
|
6月前
|
存储
ZooKeeper客户端常用命令
ZooKeeper客户端常用命令
66 1