概述
在讲解完zookeeper核心的选举部分的功能逻辑之后,另外一个我个人觉得需要理解的就是zookeeper的client-server之间的连接的建立过程,因为除了zookeeper各个节点之间的通信外,另外一大块就是zookeeper作为server端与client端的交互,包括之前的连接建立以及后续的各种操作命令入get/set等操作,这个博文专注于讲清楚前面部分的概念,后面部分的逻辑会有专门的一篇博文来阐述。
当然由于现在对于zookeeper的访问已经有很多开源的实现,排在首位的应该就是Curator了(最早介绍这个东西给我的还是我在车来了的同事袁翔,感谢当初带我入门),大部分情况我们使用的都是它封装的高级API,但是其实如果我们只是使用它的高级API而没有细究底层,我们还是不知道client-server之间的交互细节的,所以为了能够更直观的知道细节,我就在网上找了一个demo,基于这个demo我们可以开始开始我们的分析了。

说明
上面中我们看到的client通过new ZooKeeper的api创建了server的连接,然后开始的一系列操作,所以我们的源码分析也从这个地方开始。
zookeeper-session连接
Session建立 - client端
通过demo我们看出来,client连接zookeeper的server端其实就是创建了zookeeper对象。
创建zookeeper对象,核心的点是创建了ClientXnxn对象,该对象内部包含两个核心对象,分别是sendThread和eventThread。
启动zookeeper对象,实际上是启动sendThread和eventThread。当然我们关注的是sendThread这部分的工作,基本上建立连接(session的建立)也就是它在玩转的。
在sendThread当中我们需要处理各种连接事件,譬如注册OP_CONNECT/OP_WRITE/
OP_READ等相关事件。

说明:
Zookeeper当中主要是创建了ClientXnxn对象并进行启动,其中ClientCnxn对象内部主要对象是两个线程,分别是是sendThread和eventThread,其中sendThread负责连接server。

说明:
很明显的创建两个线程的逻辑,一个是sendThread,一个是eventThread。我们关注sendThread的run部分逻辑。

说明:
sendThread关联的几个对象包括sessionId,outgoingQueue等。
sendThread内部如果判断clientCnxnSocket没有建立连接,就会开始尝试建立连接。
我们关注的应该就是建立连接的过程,关注startConnect部分逻辑。

说明:
继续跟进connect部分的逻辑

说明:
继续跟进registerAndConnect部分的逻辑

说明:
首先将socket注册到selector当中并关注OP_CONNECT动作,这样异步连接成功的过程中就可以捕捉到事件了。
如果立即连接成功以后就直接进入后续处理了,关注一下primeConnection这个动作,在异步连接成功后也会执行这个函数的。

说明:
连接成功我们开始发送相关报文给server端,其中发送是通过放到outgoing队列中,有专门的发送线程负责发送。
其实发送了两种报文,但是不知道前面的报文是什么东西,看着像各种watch。
最后最重要的部分在于connectionPrimed部分操作,其实就是注册了OP_READ和OP_WRITE事件到selector当中了。

说明:
其实这个run逻辑是在sendThread当中执行的,我们真正关心的部分逻辑是在doTransport部分,里面其实是对异步连接成功的处理。

说明:
进入doTransport的逻辑我们看到了selector的执行部分,其中select返回的就是感兴趣的事件,我们在registerAndConnect逻辑当中注册了OP_CONNECT事件,所以假设异步连接成功了那么我们就再次进入了sendThread.primeConnection的逻辑。
在处理OP_CONNECT事件逻辑,sendThread.primeConnection的逻辑其实就是在发送package报文。
在处理OP_READ和OP_WRITE的逻辑,进入的其实是doIO部分的逻辑。

说明:
处理读事件也是一件挺有意思的事情,基本上你会看到ByteBuffer的各种用法,这里读取的逻辑其实很简单,先读取4Byte的数据长度,然后再读取剩余的实际报文数据。
incomingBuffer一开始读取的是报文长度,在readLnegth()内部其实就是读取实际数据,根据sock.read(incomingBuffer)获取报文的长度。

说明:
处理写事件,基本上就是发送报文,细节没仔细关注。
Session建立 - server端
server端其实就是接收client端的连接,接受连接部分的逻辑似乎有点绕,所以我默认就从server端已经接受了连接并开始处理报文的逻辑开始。
通过整个逻辑的串联了解下server对报文请求的处理,其实整个处理过程类似pipeLine的过程,由PrepRequestProcessor、SyncRequestProcessor、FinalRequestProcessor三者进行的串联。

说明:
开始进入处理connect请求部分的逻辑,入口函数已经很明显了。

说明:
进入创建session部分的逻辑,注意在这里生成了cnxn对象,session密码,超时时间等。

说明:
创建session其实一个异步过程,这了我们生成了一个Request对象,然后提交这个Request对象。

说明:
首先我们通过PrepRequestProcessor操作进行第一波处理,processRequest操作其实把request提交到一个队列当中submittedRequests当中,具体的消费处理逻辑看下一个逻辑代码。

说明:
没错,这里开始进行第一波处理了,看函数就是PrepRequestProcessor进行处理,具体处理逻辑往后继续看。

说明:
其实这个地方我们基本上知道了zookeeper处理请求的核心逻辑代码,我们只是现在关心session的create事件而已。

说明:
这里我们看到createSession部分的逻辑,继续关注pRequest2Txn逻辑。

说明:
我们将进行下一步下一步处理,至于nextProcessor从哪里来的呢,可以看下一个截图。
其实nextProcessor其实是syncProcessor。

说明:
基本上可以看出来了,PrepRequestProcessor、SyncRequestProcessor、FinalRequestProcessor。

说明:
只是把任务简单的提交了另外一个queue当中,也就是queuedRequests当中。

说明:
take任务继续下一步处理,这个还在SyncRequestProcessor当中,我们关注其实是flush动作,继续看下图的代码。

说明:
其实flush里面最后还是将任务提交到给FinalRequestProcessor进行处理。

说明:
进入zks.processTxn的逻辑,这部分代码其实很多,所以只截取了其中一部分。

说明:
把session加入到全局session当中。

说明:
真正完成session初始的入口函数。

说明:
一开始通过serverCnxnFactory.registerConnection将session注册到server端,将session创建的结果发送回client端。

说明:
在server端维持新建的session对象,但是我暂时也不知道干嘛。我们在创建ServerCnxnFactory的过程中会生成server端负责accept连接,这部分到时候后面再继续补充。