深入理解 ZooKeeper客户端与服务端的watcher回调(一)

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 2020-02-08 补充本篇博文所描述的watcher回调的流程图

2020-02-08 补充本篇博文所描述的watcher回调的流程图



watcher存在的必要性#


举个特容易懂的例子: 假如我的项目是基于dubbo+zookeeper搭建的分布式项目, 我有三个功能相同的服务提供者,用zookeeper当成注册中心,我的三个项目得注册进zookeeper才能对外暴露服务,但是问题来了,写的java代码怎么才能注册进zookeeper呢?当然加入依赖,写好配置文件再启动就成了,这时,这三个服务体提供者就是zookeeper的客户端了,zookeeper的客户端不止一个,我选择了哪个依赖,就是哪个客户端,光有服务提供者不成啊,对外提供服务,我得需要服务消费者啊,于是用同样的方式,把消费者也注册进zookeeper,zookeeper中就存在了4个node,也就是4个客户端,服务消费者订阅zookeeper,向它拉取服务提供者的address,然后把地址缓存在本地, 进而可以远程调用服务消费者,那么问题又来了,万一哪一台服务提供者挂了,怎么办呢?zookeeper是不是得通知消费者呢? 万一哪一天服务提供者的address变了,是不是也得通知消费者? 这就是watcher存在的意义,它解决了这件事


watcher的类型#


keeperState EventType 触发条件 说明
SyncConnected None(-1) 客户端与服务端建立连接 客户端与服务端处于连接状态
SyncConnected NodeCreate(1) watcher监听的数据节点被创建 客户端与服务端处于连接状态
SyncConnected NodeDeleted(2) Watcher监听的数据节点被删除 客户端与服务端处于连接状态
SyncConnected NodeDataChanged(3) watcher监听的node数据内容发生改变 客户端与服务端处于连接状态
SyncConnected NodeChildrenChange(4) 被监听的数据节点的节点列表发生变更 客户端与服务端处于连接状态
Disconnect None(-1) 客户端与服务端断开连接 客户端与服务端断开连接
Expired (-112) None(-1) 会话超时 session过期,收到异常SessionExpiredException
AuthFailed None(-1) 1.使用了错误的scheme 2,SALS权限验证失败了 收到异常AuthFailedException


实验场景:#


假设我们已经成功启动了zookeeper的服务端和客户端,并且预先添加了watcher,然后使用控制台动态的修改下node的data,我们会发现watcher回调的现象

添加的钩子函数代码如下:


public class ZookepperClientTest {
    public static void main(String[] args) throws Exception {
        ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.err.println("连接,触发");
            }
        });
    Stat stat = new Stat();
     //   todo 下面添加的事件监听器可是实现事件的消费订阅
      String content = new String(client.getData("/node1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // todo 任何连接上这个节点的客户端修改了这个节点的 data数据,都会引起process函数的回调
                // todo 特点1:  watch只能使用1次
                if (event.getType().equals(Event.EventType.NodeDataChanged)){
                    System.err.println("当前节点数据发生了改变");
                }
            }
        }, stat));


看如上的代码, 添加了一个自己的watcher也就是client.getData("/node1", new Watcher() {} 这是个回调的钩子函数,执行时不会运行,当满足的某个条件时才会执行, 比如: node1被删除了, node1的data被修改了


getData做了哪些事情?#


源码如下: getdata,顾名思义,返回服务端的node的data+stat, 当然是当服务端的node发生了变化后调用的

主要主流如下几个工作

  • 创建WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
  • 其实就是一个简单的内部类,将path 和 watch 封装进了一个对象
  • 创建一个request,并且初始化这个request.head=getData=4
  • 调用ClientCnxn.submitRequest(...) , 将现存的这些信息进一步封装
  • request.setWatch(watcher != null);说明他并没有将watcher封装进去,而是仅仅做了个有没有watcher的标记


public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
         // todo 校验path
        final String clientPath = path;
        PathUtils.validatePath(clientPath);
        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            // todo DataWatchRegistration 继承了 WatchRegistration
            // todo DataWatchRegistration 其实就是一个简单的内部类,将path 和 watch 封装进了一个对象
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
        final String serverPath = prependChroot(clientPath);
        // todo 创建一个请求头
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        // todo 创建了一个GetDataRequest
        GetDataRequest request = new GetDataRequest();
        // todo 给这个请求初始化,path 是传递进来的path,但是 watcher不是!!! 如果我们给定了watcher , 这里面的条件就是  true
        request.setPath(serverPath);
        request.setWatch(watcher != null); // todo 可看看看服务端接收到请求是怎么办的
        GetDataResponse response = new GetDataResponse();
        // todo 同样由 clientCnxn 上下文进行提交请求, 这个操作应该同样是阻塞的
         // todo EventThread 和 SendThread 同时使用一份 clientCnxn的 submitRequest()
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }


ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 的源码我写在下面, 这里来到这个方法中,一眼能看到,它依然是阻塞的式的,并且requet被进一步封装进packet

更重要的是 queuePacket()方法的最后一个参数,存在我们刚刚创建的path+watcher的封装类


public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    // todo 来到这个 queuePacket() 方法在下面, 这个方法就是将  用户输入-> string ->>> request ->>> packet 的过程
    Packet packet = queuePacket(h, r, request, response, null, null, null,
            null, watchRegistration);
    // todo 使用同步代码块,在下面的进行    同步阻塞等待, 直到有了Response响应才会跳出这个循环, 这个finished状态就是在客户端接受到服务端的
    // todo 的响应后, 将服务端的响应解析出来,然后放置到 pendingqueue里时,设置上去的
    synchronized (packet) {
        while (!packet.finished) {
            // todo 这个等待是需要唤醒的
            packet.wait();
        }
    }
    // todo 直到上面的代码块被唤醒,才会这个方法才会返回
    return r;
}


同样,在queuePacket()方法中将packet提交到outgoingQueue中,最终被seadThread消费发送到服务端


服务端如何处理watchRegistration不为空的packet#


后续我准备用一整篇博客详解单机模式下服务端处理请求的流程,所以这篇博客只说结论

在服务端,用户的请求最终会按顺序流向三个Processor,分别是

  • PrepRequestProcessor
  • 负责进行一些状态的修改
  • SyncRequestProcessor
  • 将事务日志同步到磁盘
  • FinalRequestProcessor
  • 相应用户的请求

我们直接去看FinalRequestProcessorpublic void processRequest(Request request) {}方法,看他针对getData()方式的请求做出了哪些动作.下面来了个小高

潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);跟进watcher的有无给服务端添加不同的Watcher

真的得划重点了,当我发现这一点时,我的心情是超级激动的,就像发现了新大陆一样


case OpCode.getData: {
        lastOp = "GETD";
        GetDataRequest getDataRequest = new GetDataRequest();
        ByteBufferInputStream.byteBuffer2Record(request.request,
                getDataRequest);
        DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                ZooDefs.Perms.READ,
                request.authInfo);
        Stat stat = new Stat();
        // todo 这里的操作    getDataRequest.getWatch() ? cnxn : null 对应可客户端的  跟进watcher有没有而决定往服务端传递 true 还是false 相关
        // todo 跟进去 getData()
        byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                getDataRequest.getWatch() ? cnxn : null);
        //todo  cnxn的Processor()被回调, 往客户端发送数据 , 什么时候触发呢? 就是上面的  处理事务时的回调 第127行
        // todo 构建了一个 rsp ,在本类的最后面将rsp 响应给client
        rsp = new GetDataResponse(b, stat);
        break;
    }


继续跟进这个getData()在服务端维护了一份path+watcher的map


public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        n.copyStat(stat);
        if (watcher != null) {
            // todo 将path 和 watcher 绑定在一起
            dataWatches.addWatch(path, watcher);
        }
        return n.data;
    }
}


客户端打开命令行,修改服务端node的状态#


书接上回,当客户单的代码去创建ClientCnxn时,有下面的逻辑 , 它开启了两条守护线程, sendThread负责向服务端发送心跳,已经和服务端进行用户相关的IO交流, EventThread就负责和txn事务相关的处理逻辑,级别上升到针对node


// todo start就是启动了在构造方法中创建的线程
    public void start() {
        sendThread.start();
        eventThread.start();
    }


到目前为止,客户端就有如下三条线程了

  • 负责处理用户在控制台输入命令的主线程
  • 守护线程1: seadThread
  • 守护线程2: eventThread


跟进主线程的处理用户输入部分的逻辑代码如下:

下面的代码的主要逻辑就是处理用户输入的命令,当通过if-else选择分支判断用户到底输入的啥命令


按照我们的假定的场景,用户输入的命令是这样的 set /path newValue 所以,毫无疑问,经过解析后代码会去执行下面的stat = zk.setData(path, args[2].getBytes(),部分


// todo zookeeper客户端, 处理用户输入命令的具体逻辑
    // todo  用大白话讲,下面其实就是把 从控制台获取的用户的输入信息转换成指定的字符, 然后发送到服务端
    // todo MyCommandOptions 是处理命令行选项和shell脚本的工具类
    protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
        // todo 在这个方法中可以看到很多的命令行所支持的命令
        Stat stat = new Stat();
        // todo 获取命令行输入中 0 1 2 3 ... 位置的内容, 比如 0 位置是命令  1 2 3 位置可能就是不同的参数
        String[] args = co.getArgArray();
        String cmd = co.getCommand();
        if (args.length < 1) {
            usage();
            return false;
        }
        if (!commandMap.containsKey(cmd)) {
            usage();
            return false;
        }
        boolean watch = args.length > 2;
        String path = null;
        List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
        LOG.debug("Processing " + cmd);
        if (cmd.equals("quit")) {
            System.out.println("Quitting...");
            zk.close();
            System.exit(0);
        } else if (cmd.equals("set") && args.length >= 3) {
            path = args[1];
            stat = zk.setData(path, args[2].getBytes(),
                    args.length > 3 ? Integer.parseInt(args[3]) : -1);
            printStat(stat);


继续跟进stat = zk.setData(path, args[2].getBytes(),下面的逻辑也很简单,就是将用户的输入封装进来request中,通过ClientCnxn类的submit方法提交到一个队列中,等待着sendThread去消费


这次有目的的看一下submitRequest的最后一个参数为null, 这个参数是WatchRegistration的位置,一开始置为null


public Stat setData(final String path, byte data[], int version)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);
        final String serverPath = prependChroot(clientPath);
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.setData);
        SetDataRequest request = new SetDataRequest();
        request.setPath(serverPath);
        request.setData(data);
        request.setVersion(version);
        SetDataResponse response = new SetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        return response.getStat();
    }


跟进这个submitRequest()方法, 源码如下,不处所料的是,它同样被阻塞住了,直到服务端给了它响应


当前代码的主要逻辑就是将request封装进packet,然后将packet添加到ClintCnxn维护的outgoingQueue队列中等待sendThread的消费


这次来到这个方法是因为我们在控制台输入的set 命令而触发的,比较重要的是本次packet携带的WatchRegistration==null, 毫无疑问,这次服务端在FinalRequestProcessor中再处理时取出的watcher==null, 也就不会将path+watcher保存进maptable中

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
26 0
|
3月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
14天前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
34 1
|
14天前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
32 1
|
14天前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
34 1
|
1月前
|
负载均衡 API 数据安全/隐私保护
Zookeeper的客户端-原生的API
Zookeeper的客户端-原生的API
|
2月前
|
监控 API
【zookeeper 第四篇章】监控 Watcher
ZooKeeper通过Watcher机制实现了数据的发布/订阅功能。多个订阅者可以监听同一主题对象,一旦该对象状态变化,如节点内容或子节点列表变动,ZooKeeper会实时通知所有订阅者。Watcher架构包括ZooKeeper服务端、客户端及其Watcher管理器。客户端向服务端注册Watcher并保存至本地管理器中;当状态变化时,服务端通知客户端,触发相关Watcher回调处理逻辑。
53 2
|
3月前
|
API
【想进大厂还不会阅读源码】ShenYu源码-替换ZooKeeper客户端
ShenYu源码阅读。相信大家碰到源码时经常无从下手,不知道从哪开始阅读😭。我认为有一种办法可以解决大家的困扰!至此,我们发现自己开始从大量堆砌的源码中脱离开来😀。ShenYu是一个异步的,高性能的,跨语言的,响应式的 API 网关。
|
5月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
66 11
|
5月前
|
存储
ZooKeeper客户端常用命令
ZooKeeper客户端常用命令
62 1