ZooKeeper源码阅读系列-zk-client解析

简介: 忘记了,ZK还有一章没有解读完,所以今天继续ZK的源码解读。本篇文章会研究执行zkCli-server后 ,这条命令到底发生了什么,客户端是怎么和服务端建立连接的,建立连接之后客户端发送请求命令是怎么处理的。

微信截图_20220531131804.png

前言

忘记了,ZK还有一章没有解读完,所以今天继续ZK的源码解读。本篇文章会研究执行zkCli-server后 ,这条命令到底发生了什么,客户端是怎么和服务端建立连接的,建立连接之后客户端发送请求命令是怎么处理的。下图就是ZK连接时候发生的心跳检测。微信截图_20220531134141.png再结合看下zk客户端和服务端的流程图,研究源码要有图,并且结合流程图去看代码才能更容易理解微信截图_20220531134204.pngzkCli调用的就是zkCli.sh脚本,我们看下这个脚本是干嘛的

ZOOBIN="${BASH_SOURCE-$0}"
        ZOOBIN="$(dirname "${ZOOBIN}")"
        ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
        if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
        . "$ZOOBINDIR"/../libexec/zkEnv.sh
        else
        . "$ZOOBINDIR"/zkEnv.sh
        fi
        "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
        -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS
        org.apache.zookeeper.ZooKeeperMain "$@"
复制代码

如果看不懂脚本的小伙伴,可以去看看ZooKeeperMain这个类也会明白会和ZooKeeperMain这个类有关,没错,zkCli命令就是调用了ZooKeeperMain.main()函数方法,main函数会解析输入的命令-server最终会调用下边这个方法。微信截图_20220531134233.png从上面的代码我们可以看出zkCli-server命令其实就是创建了一个Zookeeper实例,跟进ZooKeeper的构造函数微信截图_20220531134313.png

梳理一下构造函数做的事情

1.将传入的watcher设置到watchManager的defaultWatcher
2.构造地址解析器,解析ip:port
3.获取服务器端地址,客户端将其保存在服务器地址列表管理器HostProvider中
4.ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互
5.启动网络连接器ClientCnxn

接下来看一下几个主要方法的解读

1.地址解析器ConnectStringParser,它的主要对zk传入的connectString做两个处理:解析rootpath以及保存服务器地址列表chrootpath: zk允许每个客户端为自己设置已给命名空间。如果一个zookeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都将会被限制在自己的命名空间下。
2.HostProvider,HostProvider的实现类是StaticHostProvider,内部对解析出来的地址和端口封装为InetSocketAddress,并提供next方法返回一个服务器地址供客户端连接ClientCnxn,这个上面有提到过。
3.客户端服务端通信协议Packet,Packet是ClientCnxn的一个内部类,它也是客户端和服务端的通信协议
4.Packet协议中用于序列化网络传输的只有requestHeader,request,readOnly几个字段。ClientCnxn客户端连接服务端核心类ClientCnxn文件的代码很长也是整个Client的I/O代码主要分为三部分

1)I/O通信层ClientCnxnSocket 2)I/O核心调度线程ClientCnxn.SendThread 3)ClientCnxn.EventThread客户端事件处理线程 以上三个部分也是客户端可服务端建立连接以及客户端发送请求,处理服务端响应的核心方法。

5.doIO()处理I/O事件

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
     throws InterruptedException, IOException {
  SocketChannel sock = (SocketChannel) sockKey.channel();//获取Channel
  if (sock == null) {
     throw new IOException("Socket is null!");
  }
  if (sockKey.isReadable()) {//读就绪
     int rc = sock.read(incomingBuffer);//读取长度
     if (rc < 0) { ///如果<0,表示读到末尾了,eof了异常关闭了
        throw new EndOfStreamException(
              "Unable to read additional data from server sessionid 0x"
                    + Long.toHexString(sessionId)
                    + ", likely server has closed socket");
     }
     if (!incomingBuffer.hasRemaining()) {//如果还有数据
        incomingBuffer.flip(); //切换模式
        if (incomingBuffer == lenBuffer) {
           recvCount++;//新增接收次数
           readLength();//获取len并给incomingBuffer分配对应空间
        } else if (!initialized) {//如果连接还未初始化
           readConnectResult();//读取connect并回复
           enableRead();//启用读
           if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
              //如果有可以发送的packet
              enableWrite();//允许写,因为有要发送的packet
           }
           lenBuffer.clear();
           incomingBuffer = lenBuffer;
           updateLastHeard();
           initialized = true;//初始化完成
        } else {//如果已连接,并且已经给incomingBuffer分配了对应len的空间
           sendThread.readResponse(incomingBuffer);//读取response
           lenBuffer.clear();
           incomingBuffer = lenBuffer;
           updateLastHeard();//更新上次接收时间
        }
     }
  }
  //如果是写事件
  if (sockKey.isWritable()) {
     synchronized(outgoingQueue) {//锁住队列
        Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());//找到可以发送的packet
        if (p != null) {
           updateLastSend();
           if (p.bb == null) {//packet内部还未生成bytebuffer
              if ((p.requestHeader != null) &&
                    (p.requestHeader.getType() != OpCode.ping) &&
                    (p.requestHeader.getType() != OpCode.auth)) {
                 p.requestHeader.setXid(cnxn.getXid());
              }
              p.createBB();
           }
           sock.write(p.bb);//发送到服务端
           if (!p.bb.hasRemaining()) {
              sentCount++;
              outgoingQueue.removeFirstOccurrence(p);
              if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                 synchronized (pendingQueue) {
                    pendingQueue.add(p);
                 }
              }
           }
        }
        if (outgoingQueue.isEmpty()) {
           disableWrite();//如果没有要发的就禁止写
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
           disableWrite();.//禁止写
        } else {
           // Just in case
           enableWrite();//开启写
        }
     }
  }
}
复制代码

总结下doIO主要分为读/写俩种方式处理

1.读:
1)_ 判断是否完成初始化,如果未完成初始化就完成初始化
2)读取len再给incomingBuffer分配对应空间
3) 读取response,比较特殊的是读会产生俩次,第一次会读取buffer长度,分配完空间,第二次会读取剩下的数据,可能是这样处理拆包吧
2.写:
1) 找到需要发送的packet
2) 生成需要发送的byteBuffer
3) 将buffer写入channel
4) 将packet从outgoingQueue移除并添加到pendingQueue等待server的响应

另外就是ClientCnxn.SendThread核心调度线程中的读取服务端响应readResponse(),sendPing()发送心跳,startConnect()建立连接,这里由于时间关系放下章解读。

最后在总结下整个流程

1. ZooKeeperMain解析zk命令,如果是-server则会实例化Zookeeper对象
2. 解析传入的server的地址并保存在服务器地址管理器HostProvider中
3. 创建网络连接器ClientCnxn,同时初始化核心队列outGoingQueue和pendingQueue,分别客户端的请求队列和等待响应队列
4. 客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事情
5. 启动SendThread和EventThread
6. SendThread从HostProvider中获取一个server地址,然后交给ClientCnxnSocket创建一个tcp长连接连接server
7. 连接创建完成,客户端开始发送请求,主要是ClientCnxnSocket负责从outgoingQueue中取出Packet对象,序列化成ByteBuffer后,向服务端进行发送
8. 接收server响应,通过doIO方法调用readResponse()处理响应
9. readResponse()将事件转发EventThread触发回调或者Watcher

后话

ZK的源码确实必其他开源难解读得多,但是收获很大,要相信一切的努力都是值得的,加油。这也是Zk源码解读系列的最后一篇,还是那个道理吧,得反反复复的看才能找到新的东西。

目录
相关文章
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
561 2
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
923 140
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1497 29
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
584 4
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
4132 1
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
1387 2
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析