ZooKeeper源码阅读系列-zk-client解析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 忘记了,ZK还有一章没有解读完,所以今天继续ZK的源码解读。本篇文章会研究执行zkCli-server后 ,这条命令到底发生了什么,客户端是怎么和服务端建立连接的,建立连接之后客户端发送请求命令是怎么处理的。

微信截图_20220531131804.png

前言

忘记了,ZK还有一章没有解读完,所以今天继续ZK的源码解读。本篇文章会研究执行zkCli-server后 ,这条命令到底发生了什么,客户端是怎么和服务端建立连接的,建立连接之后客户端发送请求命令是怎么处理的。下图就是ZK连接时候发生的心跳检测。微信截图_20220531134141.png再结合看下zk客户端和服务端的流程图,研究源码要有图,并且结合流程图去看代码才能更容易理解微信截图_20220531134204.pngzkCli调用的就是zkCli.sh脚本,我们看下这个脚本是干嘛的

ZOOBIN="${BASH_SOURCE-$0}"
        ZOOBIN="$(dirname "${ZOOBIN}")"
        ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
        if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
        . "$ZOOBINDIR"/../libexec/zkEnv.sh
        else
        . "$ZOOBINDIR"/zkEnv.sh
        fi
        "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
        -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS
        org.apache.zookeeper.ZooKeeperMain "$@"
复制代码

如果看不懂脚本的小伙伴,可以去看看ZooKeeperMain这个类也会明白会和ZooKeeperMain这个类有关,没错,zkCli命令就是调用了ZooKeeperMain.main()函数方法,main函数会解析输入的命令-server最终会调用下边这个方法。微信截图_20220531134233.png从上面的代码我们可以看出zkCli-server命令其实就是创建了一个Zookeeper实例,跟进ZooKeeper的构造函数微信截图_20220531134313.png

梳理一下构造函数做的事情

1.将传入的watcher设置到watchManager的defaultWatcher
2.构造地址解析器,解析ip:port
3.获取服务器端地址,客户端将其保存在服务器地址列表管理器HostProvider中
4.ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互
5.启动网络连接器ClientCnxn

接下来看一下几个主要方法的解读

1.地址解析器ConnectStringParser,它的主要对zk传入的connectString做两个处理:解析rootpath以及保存服务器地址列表chrootpath: zk允许每个客户端为自己设置已给命名空间。如果一个zookeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都将会被限制在自己的命名空间下。
2.HostProvider,HostProvider的实现类是StaticHostProvider,内部对解析出来的地址和端口封装为InetSocketAddress,并提供next方法返回一个服务器地址供客户端连接ClientCnxn,这个上面有提到过。
3.客户端服务端通信协议Packet,Packet是ClientCnxn的一个内部类,它也是客户端和服务端的通信协议
4.Packet协议中用于序列化网络传输的只有requestHeader,request,readOnly几个字段。ClientCnxn客户端连接服务端核心类ClientCnxn文件的代码很长也是整个Client的I/O代码主要分为三部分

1)I/O通信层ClientCnxnSocket 2)I/O核心调度线程ClientCnxn.SendThread 3)ClientCnxn.EventThread客户端事件处理线程 以上三个部分也是客户端可服务端建立连接以及客户端发送请求,处理服务端响应的核心方法。

5.doIO()处理I/O事件

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
     throws InterruptedException, IOException {
  SocketChannel sock = (SocketChannel) sockKey.channel();//获取Channel
  if (sock == null) {
     throw new IOException("Socket is null!");
  }
  if (sockKey.isReadable()) {//读就绪
     int rc = sock.read(incomingBuffer);//读取长度
     if (rc < 0) { ///如果<0,表示读到末尾了,eof了异常关闭了
        throw new EndOfStreamException(
              "Unable to read additional data from server sessionid 0x"
                    + Long.toHexString(sessionId)
                    + ", likely server has closed socket");
     }
     if (!incomingBuffer.hasRemaining()) {//如果还有数据
        incomingBuffer.flip(); //切换模式
        if (incomingBuffer == lenBuffer) {
           recvCount++;//新增接收次数
           readLength();//获取len并给incomingBuffer分配对应空间
        } else if (!initialized) {//如果连接还未初始化
           readConnectResult();//读取connect并回复
           enableRead();//启用读
           if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
              //如果有可以发送的packet
              enableWrite();//允许写,因为有要发送的packet
           }
           lenBuffer.clear();
           incomingBuffer = lenBuffer;
           updateLastHeard();
           initialized = true;//初始化完成
        } else {//如果已连接,并且已经给incomingBuffer分配了对应len的空间
           sendThread.readResponse(incomingBuffer);//读取response
           lenBuffer.clear();
           incomingBuffer = lenBuffer;
           updateLastHeard();//更新上次接收时间
        }
     }
  }
  //如果是写事件
  if (sockKey.isWritable()) {
     synchronized(outgoingQueue) {//锁住队列
        Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());//找到可以发送的packet
        if (p != null) {
           updateLastSend();
           if (p.bb == null) {//packet内部还未生成bytebuffer
              if ((p.requestHeader != null) &&
                    (p.requestHeader.getType() != OpCode.ping) &&
                    (p.requestHeader.getType() != OpCode.auth)) {
                 p.requestHeader.setXid(cnxn.getXid());
              }
              p.createBB();
           }
           sock.write(p.bb);//发送到服务端
           if (!p.bb.hasRemaining()) {
              sentCount++;
              outgoingQueue.removeFirstOccurrence(p);
              if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                 synchronized (pendingQueue) {
                    pendingQueue.add(p);
                 }
              }
           }
        }
        if (outgoingQueue.isEmpty()) {
           disableWrite();//如果没有要发的就禁止写
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
           disableWrite();.//禁止写
        } else {
           // Just in case
           enableWrite();//开启写
        }
     }
  }
}
复制代码

总结下doIO主要分为读/写俩种方式处理

1.读:
1)_ 判断是否完成初始化,如果未完成初始化就完成初始化
2)读取len再给incomingBuffer分配对应空间
3) 读取response,比较特殊的是读会产生俩次,第一次会读取buffer长度,分配完空间,第二次会读取剩下的数据,可能是这样处理拆包吧
2.写:
1) 找到需要发送的packet
2) 生成需要发送的byteBuffer
3) 将buffer写入channel
4) 将packet从outgoingQueue移除并添加到pendingQueue等待server的响应

另外就是ClientCnxn.SendThread核心调度线程中的读取服务端响应readResponse(),sendPing()发送心跳,startConnect()建立连接,这里由于时间关系放下章解读。

最后在总结下整个流程

1. ZooKeeperMain解析zk命令,如果是-server则会实例化Zookeeper对象
2. 解析传入的server的地址并保存在服务器地址管理器HostProvider中
3. 创建网络连接器ClientCnxn,同时初始化核心队列outGoingQueue和pendingQueue,分别客户端的请求队列和等待响应队列
4. 客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事情
5. 启动SendThread和EventThread
6. SendThread从HostProvider中获取一个server地址,然后交给ClientCnxnSocket创建一个tcp长连接连接server
7. 连接创建完成,客户端开始发送请求,主要是ClientCnxnSocket负责从outgoingQueue中取出Packet对象,序列化成ByteBuffer后,向服务端进行发送
8. 接收server响应,通过doIO方法调用readResponse()处理响应
9. readResponse()将事件转发EventThread触发回调或者Watcher

后话

ZK的源码确实必其他开源难解读得多,但是收获很大,要相信一切的努力都是值得的,加油。这也是Zk源码解读系列的最后一篇,还是那个道理吧,得反反复复的看才能找到新的东西。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
103 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
90 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
69 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
75 0
|
3月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
101 0
|
20天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
20天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
20天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
20天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
61 12

推荐镜像

更多