深入理解 ZooKeeper单机客户端的启动流程(三)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 深入理解 ZooKeeper单机客户端的启动流程(三)

客户端的阻塞式等待 -- 自旋锁#


跟进submitRequest()


// todo 这是ClientCnxn的类, 提交请求, 最终将我们的请求传递到socket
    // todo 返回一个header, 因为根据它判断是否是否出错了
    public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        // todo 来到这个 queuePacket() 方法在下面, 这个方法就是将  用户输入-> string ->>> request ->>> packet 的过程
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        // todo 使用同步代码块,在下面的进行    同步阻塞等待, 直到有了Response响应才会跳出这个循环, 这个finished状态就是在客户端接受到服务端的
        // todo 的响应后, 将服务端的响应解析出来,然后放置到 pendingqueue里时,设置上去的
        synchronized (packet) {
            while (!packet.finished) {
                // todo 这个等待是需要唤醒的
                packet.wait();
            }
        }
        // todo 直到上面的代码块被唤醒,才会这个方法才会返回
        return r;
    }


在上面的代码中,可以看到可以他是使用一个while(!packet,finishes){} 来阻塞程序的, 刚看看到用户的命令被封装进了request, 接下来, 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);中,可以看到他被封装进packet,然后添加到outgoingqueue队列中,源码如下


// todo
    Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
        // Note that we do not generate the Xid for the packet yet.
        // todo 它会为我们的没有 Xid  的packet生成 Xid
        // It is generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // todo 她会在ClientCnxnSocket::doIO()之后生成
        // where the packet is actually sent.
        // todo packet实际生成的位置
        synchronized (outgoingQueue) {
            // todo 将用户传递过来的信息包装成 Packet
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                // todo 如果客户端正在发送关闭session的请求, 就标记成 closing = true
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                // todo 将packet 添加到队列里面
                // todo 这个什么时候会被消费呢? 是在sendthread的无限循环中被消费的, 因为那是第二条线程
                outgoingQueue.add(packet);
            }
        }
        // todo getClientCnxnSocket() 获取ClientCnxnSocket对象
        // todo wakeupCnxn() 是 ClientCnxnSocket对象 中的抽象方法, 实现类是 ClientCnxnSocket的实现类ClientCnxnSocketNio
        // 唤醒阻塞在selector.select上的线程,让该线程及时去处理其他事情,比如这里的让sendThread 干净去消费packet 
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }


在这个方法的最后一行,点睛,selector.wakeup(); 就是通知选择器,别再阻塞select了,赶紧去做其他工作


因为选择器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代码贴出来如下


服务端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封装类的,SendThread同样也是,它可以使用


现在再看,唤醒selector 让他去做其他事 ,其实即使doIO(),这个方法代码其实我在上面贴出来过,就是分成两大部分,读就绪与写就绪


// todo 往服务端发送 packet
    //todo 下面就是NIO 网络编程的逻辑了
    @Override
    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        // todo 选择器在waitTimeOut时间内阻塞轮询== 上一次计算的 to时间
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        // todo 获取channel注册进selector时返回的key
        synchronized (this) {
            selected = selector.selectedKeys();
        }
            // Everything below and until we get back to the select is non blocking,
            //  so time is effectively a constant. That is  Why we just have to do this once, here
        // todo 直到我们重新回到select之前, 下面的全部操作都是非阻塞的
        // todo 因此时间只是一个常数, 那就是为什么我们在这里用下面的函数
        updateNow();
        //
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // todo 建立连接的逻辑
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                // todo 往服务端发送数据的逻辑 , 方法在上面的64行
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }


写到这里其实已经把整个过程顺下来了,下面再重新看看,sendThread是如果消费packet并且修改然后得到服务端的响应,修改pakcet.finished属性的, 因为现在主线的submitRequest还在阻塞呢


往服务端写#


客户端的socket的实现类是ClientCnxnSocketNio, 它往服务端写的逻辑如下, 不难看出使用的java原生的sock.write(p.bb); // 发送服务端 , 亮点是后面的操作pendingQueue.add(p);被写过的packet被添加到了pengingqueue中


if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
    // todo 查询出可发送的packet
    Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
    if (p != null) {
        updateLastSend();
        // If we already started writing p, p.bb will already exist
        if (p.bb == null) {
            if ((p.requestHeader != null) &&
                    (p.requestHeader.getType() != OpCode.ping) &&
                    (p.requestHeader.getType() != OpCode.auth)) {
                p.requestHeader.setXid(cnxn.getXid());
            }
            p.createBB();
        }
        // todo 往服务端发送数据 packet.ByteBuf
        sock.write(p.bb); // 发送服务端
        if (!p.bb.hasRemaining()) { //todo !hasRemaining  没有剩余的数据
            sentCount++;
            // todo 将发送过的packet从outgoingqueue移除
            outgoingQueue.removeFirstOccurrence(p);
            if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                synchronized (pendingQueue) {
                    // todo 如果刚才的请求头的类型不是null , 不是ping ,不是权限验证 就把packet添加到 pendingQueue
                    /**
                     * These are the packets that have been sent and are waiting for a response.
                     * todo 这个penddingQueue 存放的是已经发送的 和 等待服务器响应的packet
                     */
                    pendingQueue.add(p);
                }
            }
        }


上面说了, 为啥被使用过的pakcet还要保留一份呢? 还是那个原因,主线程还因为pakcet的finish状态未被该变而阻塞呢, 那什么时候改变呢? 答案是受到服务端的响应之后改变,在哪里收到呢? 就是DoIo()的读就绪模块,下面附上源码,它的解析我写在这段代码下面


从服务端读#


if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
    throw new EndOfStreamException(
            "Unable to read additional data from server sessionid 0x"
                    + Long.toHexString(sessionId)
                    + ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
    // todo 返回buffer
    incomingBuffer.flip();
    if (incomingBuffer == lenBuffer) {
        recvCount++;
        readLength();
    } else if (!initialized) { //todo 连接有没有初始化, 来这之前被改成了 flase ,现在
        // todo 读取服务端发给我的连接请求的结果
        readConnectResult(); // primeConnection()
        enableRead();
        if (findSendablePacket(outgoingQueue,
                cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
            // Since SASL authentication has completed (if client is configured to do so),
            // outgoing packets waiting in the outgoingQueue can now be sent.
            enableWrite();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
        updateLastHeard();
        initialized = true;
    } else {
        //todo 如果已经初始化了, 就来这里读取响应, 跟进去
        sendThread.readResponse(incomingBuffer);
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
        updateLastHeard();
    }


如上代码的最后部分,sendThread.readResponse(incomingBuffer); 下面是它的源码,它首先是从buffer中读取出服务端的发送的数据,然后一通解析,封装进pendingqueue的packet中,并且在方法的最后部分终于完成了状态的修改


// todo 同样是 sendThread的方法, 读取响应
        // todo 是经过flip 反转后的  可读的buffer
        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            // todo ---------------------   从服务端写回来的buffer中解析封装成  ReplyHeader   ----------------------------
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            // todo ---------------------------------------------------------------------
            // todo 下面根据 ReplyHeader 的 xid 判断响应的结果类型
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket               
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;                    
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) );                                
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                      LOG.warn("Got server path " + event.getPath()
                          + " which is too short for chroot path "
                          + chrootPath);
                    }
                }
                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                eventThread.queueEvent( we );
                return;
            }
            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (clientTunneledAuthenticationInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia,"token");
                zooKeeperSaslClient.respondToServer(request.getToken(),
                  ClientCnxn.this);
                return;
            }
            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                // todo 从pendingQueue 中取出第一个packet
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response to the first request!
             * // todo 因为请求存在队列中,是有顺序的, 因此我们最好对第一个做出相应
             */
            try {
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            + replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet );
                }
                // todo 把todo 从服务端解析出来的结果赋值给 pendingQueue 中的packet
                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                // todo 跟进这个方法
                finishPacket(packet);
            }
        }


解开客户端的阻塞状态#


进入finishPacket(packet)


// todo ClientCnxn 也就是本类中, 在根据用户的输入向服务端提交命令后的那个 wait唤醒了, finished=true,使得原来的while循环退出了
            private void finishPacket(Packet p) {
                if (p.watchRegistration != null) {
                    p.watchRegistration.register(p.replyHeader.getErr());
                }
                // todo 唤醒 Zookeeper中 submitRequest() 提交请求后的阻塞操作, 现在拿到请求后进行唤醒
                if (p.cb == null) {
                    synchronized (p) {
                        //todo   将这个finish 改成true, 在
                        p.finished = true;
                        p.notifyAll();
                    }
                } else {
                    p.finished = true;
            eventThread.queuePacket(p);
        }
    }


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
3月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
33 0
|
1月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
62 1
|
2月前
|
负载均衡 API 数据安全/隐私保护
Zookeeper的客户端-原生的API
Zookeeper的客户端-原生的API
|
4月前
|
API
【想进大厂还不会阅读源码】ShenYu源码-替换ZooKeeper客户端
ShenYu源码阅读。相信大家碰到源码时经常无从下手,不知道从哪开始阅读😭。我认为有一种办法可以解决大家的困扰!至此,我们发现自己开始从大量堆砌的源码中脱离开来😀。ShenYu是一个异步的,高性能的,跨语言的,响应式的 API 网关。
|
6月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
74 11
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
|
4月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
43 2