前言
忘记了,ZK还有一章没有解读完,所以今天继续ZK的源码解读。本篇文章会研究执行zkCli-server后 ,这条命令到底发生了什么,客户端是怎么和服务端建立连接的,建立连接之后客户端发送请求命令是怎么处理的。下图就是ZK连接时候发生的心跳检测。再结合看下zk客户端和服务端的流程图,研究源码要有图,并且结合流程图去看代码才能更容易理解zkCli调用的就是zkCli.sh脚本,我们看下这个脚本是干嘛的
ZOOBIN="${BASH_SOURCE-$0}" ZOOBIN="$(dirname "${ZOOBIN}")" ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)" if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then . "$ZOOBINDIR"/../libexec/zkEnv.sh else . "$ZOOBINDIR"/zkEnv.sh fi "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \ -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS org.apache.zookeeper.ZooKeeperMain "$@" 复制代码
如果看不懂脚本的小伙伴,可以去看看ZooKeeperMain这个类也会明白会和ZooKeeperMain这个类有关,没错,zkCli命令就是调用了ZooKeeperMain.main()函数方法,main函数会解析输入的命令-server最终会调用下边这个方法。从上面的代码我们可以看出zkCli-server命令其实就是创建了一个Zookeeper实例,跟进ZooKeeper的构造函数
梳理一下构造函数做的事情
1.将传入的watcher设置到watchManager的defaultWatcher
2.构造地址解析器,解析ip:port
3.获取服务器端地址,客户端将其保存在服务器地址列表管理器HostProvider中
4.ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互
5.启动网络连接器ClientCnxn
接下来看一下几个主要方法的解读
1.地址解析器ConnectStringParser,它的主要对zk传入的connectString做两个处理:解析rootpath以及保存服务器地址列表chrootpath: zk允许每个客户端为自己设置已给命名空间。如果一个zookeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都将会被限制在自己的命名空间下。
2.HostProvider,HostProvider的实现类是StaticHostProvider,内部对解析出来的地址和端口封装为InetSocketAddress,并提供next方法返回一个服务器地址供客户端连接ClientCnxn,这个上面有提到过。
3.客户端服务端通信协议Packet,Packet是ClientCnxn的一个内部类,它也是客户端和服务端的通信协议
4.Packet协议中用于序列化网络传输的只有requestHeader,request,readOnly几个字段。ClientCnxn客户端连接服务端核心类ClientCnxn文件的代码很长也是整个Client的I/O代码主要分为三部分
1)I/O通信层ClientCnxnSocket 2)I/O核心调度线程ClientCnxn.SendThread 3)ClientCnxn.EventThread客户端事件处理线程 以上三个部分也是客户端可服务端建立连接以及客户端发送请求,处理服务端响应的核心方法。
5.doIO()处理I/O事件
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel();//获取Channel if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) {//读就绪 int rc = sock.read(incomingBuffer);//读取长度 if (rc < 0) { ///如果<0,表示读到末尾了,eof了异常关闭了 throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) {//如果还有数据 incomingBuffer.flip(); //切换模式 if (incomingBuffer == lenBuffer) { recvCount++;//新增接收次数 readLength();//获取len并给incomingBuffer分配对应空间 } else if (!initialized) {//如果连接还未初始化 readConnectResult();//读取connect并回复 enableRead();//启用读 if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { //如果有可以发送的packet enableWrite();//允许写,因为有要发送的packet } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true;//初始化完成 } else {//如果已连接,并且已经给incomingBuffer分配了对应len的空间 sendThread.readResponse(incomingBuffer);//读取response lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard();//更新上次接收时间 } } } //如果是写事件 if (sockKey.isWritable()) { synchronized(outgoingQueue) {//锁住队列 Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());//找到可以发送的packet if (p != null) { updateLastSend(); if (p.bb == null) {//packet内部还未生成bytebuffer if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb);//发送到服务端 if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { disableWrite();//如果没有要发的就禁止写 } else if (!initialized && p != null && !p.bb.hasRemaining()) { disableWrite();.//禁止写 } else { // Just in case enableWrite();//开启写 } } } } 复制代码
总结下doIO主要分为读/写俩种方式处理
1.读:
1)_ 判断是否完成初始化,如果未完成初始化就完成初始化
2)读取len再给incomingBuffer分配对应空间
3) 读取response,比较特殊的是读会产生俩次,第一次会读取buffer长度,分配完空间,第二次会读取剩下的数据,可能是这样处理拆包吧
2.写:
1) 找到需要发送的packet
2) 生成需要发送的byteBuffer
3) 将buffer写入channel
4) 将packet从outgoingQueue移除并添加到pendingQueue等待server的响应
另外就是ClientCnxn.SendThread核心调度线程中的读取服务端响应readResponse(),sendPing()发送心跳,startConnect()建立连接,这里由于时间关系放下章解读。
最后在总结下整个流程
1. ZooKeeperMain解析zk命令,如果是-server则会实例化Zookeeper对象
2. 解析传入的server的地址并保存在服务器地址管理器HostProvider中
3. 创建网络连接器ClientCnxn,同时初始化核心队列outGoingQueue和pendingQueue,分别客户端的请求队列和等待响应队列
4. 客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事情
5. 启动SendThread和EventThread
6. SendThread从HostProvider中获取一个server地址,然后交给ClientCnxnSocket创建一个tcp长连接连接server
7. 连接创建完成,客户端开始发送请求,主要是ClientCnxnSocket负责从outgoingQueue中取出Packet对象,序列化成ByteBuffer后,向服务端进行发送
8. 接收server响应,通过doIO方法调用readResponse()处理响应
9. readResponse()将事件转发EventThread触发回调或者Watcher
后话
ZK的源码确实必其他开源难解读得多,但是收获很大,要相信一切的努力都是值得的,加油。这也是Zk源码解读系列的最后一篇,还是那个道理吧,得反反复复的看才能找到新的东西。