一、前言
前面已经分析了Watcher机制中的大多数类,本篇对于ZKWatchManager的外部类Zookeeper进行分析。
二、ZooKeeper源码分析
2.1 类的内部类
ZooKeeper的内部类框架图如下图所示
说明:
ZKWatchManager,Zookeeper的Watcher管理者,其源码在之前已经分析过,不再累赘。
WatchRegistration,抽象类,用作watch注册。
ExistsWatchRegistration,存在性watch注册。
DataWatchRegistration,数据watch注册。
ChildWatchRegistration,子节点注册。
States,枚举类型,表示服务器的状态。
- WatchRegistration
接口类型,表示对路径注册监听。
说明:可以看到WatchRegistration包含了Watcher和clientPath字段,表示监听和对应的路径,值得注意的是getWatches方式抽象方法,需要子类实现,而在register方法中会调用getWatches方法,实际上调用的是子类的getWatches方法,这是典型的工厂模式。register方法首先会判定是否需要添加监听,然后再进行相应的操作,在WatchRegistration类的默认实现中shouldAddWatch是判定返回码是否为0。 - ExistsWatchRegistration
说明:ExistsWatchRegistration 表示对存在性监听的注册,其实现了getWatches方法,并且重写了shouldAddWatch方法,getWatches方法是根据返回码的值确定返回dataWatches或者是existWatches。 - DataWatchRegistration
说明:DataWatchRegistration表示对数据监听的注册,其实现了getWatches方法,返回dataWatches。 - ChildWatchRegistration
说明:ChildWatchRegistration表示对子节点监听的注册,其实现了getWatches方法,返回childWatches。 - States
说明:States为枚举类,表示服务器的状态,其有两个方法,判断服务器是否存活和判断客户端是否连接至服务端。
2.2 类的属性
说明:ZooKeeper类存维护一个ClientCnxn类,用来管理客户端与服务端的连接。
2.3 类的构造函数 - ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)型构造函数
说明:该构造函数会初始化WatchManager的defaultWatcher,同时会解析服务端地址和端口号,之后根据服务端的地址生成HostProvider(其会打乱服务器的地址),之后生成客户端管理并启动,注意此时会调用getClientCnxnSocket函数,其源码如下
说明:该函数会利用反射创建ClientCnxnSocketNIO实例 - public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException型构造函数
说明:此型构造函数和之前构造函数的区别在于本构造函数提供了sessionId和sessionPwd,这表明用户已经之前已经连接过服务端,所以能够获取到sessionId,其流程与之前的构造函数类似,不再累赘。
2.4 核心函数分析 - create函数
函数签名:
public String create(final String path, byte data[], List acl, CreateMode createMode)
throws KeeperException, InterruptedException
说明:该create函数是同步的,主要用作创建节点,其大致步骤如下
① 验证路径是否合法,若不合法,抛出异常,否则进入②
② 添加根空间,生成请求头、请求、响应等,并设置相应字段,进入③
③ 通过客户端提交请求,判断返回码是否为0,若不是,则抛出异常,否则,进入④
④ 除去根空间后,返回响应的路径
其中会调用submitRequest方法,其源码如下
说明:submitRequest会将请求封装成Packet包,然后一直等待packet包响应结束,然后返回;若没结束,则等待。可以看到其是一个同步方法。 - create函数
函数签名:
public void create(final String path, byte data[], List acl, CreateMode createMode, StringCallback cb, Object ctx)
说明:该create函数是异步的,其大致步骤与同步版的create函数相同,只是最后其会将请求打包成packet,然后放入队列等待提交。 - delete函数
函数签名:public void delete(final String path, int version) throws InterruptedException, KeeperException
说明:该函数是同步的,其流程与create流程相似,不再累赘。 - delete函数
函数签名:public void delete(final String path, int version, VoidCallback cb, Object ctx)
说明:该函数是异步的,其流程也相对简单,不再累赘。 - multi函数
说明:该函数用于执行多个操作或者不执行,其首先会验证每个操作的合法性,然后将每个操作添加根空间后加入到事务列表中,之后会调用multiInternal函数,其源码如下
说明:multiInternal函数会提交多个操作并且等待响应结果集,然后判断结果集中是否有异常,若有异常则抛出异常,否则返回响应结果集。 exists函数
函数签名:public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public Stat exists(final String path, Watcher watcher)throws KeeperException, InterruptedException{
final String clientPath = path; // 验证路径是否合法 PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { // 生成存在性注册 wcb = new ExistsWatchRegistration(watcher, clientPath); } // 添加根空间 final String serverPath = prependChroot(clientPath); // 新生请求头 RequestHeader h = new RequestHeader(); // 设置请求头类型 h.setType(ZooDefs.OpCode.exists); // 新生节点存在请求 ExistsRequest request = new ExistsRequest(); // 设置路径 request.setPath(serverPath); // 设置Watcher request.setWatch(watcher != null); // 新生设置数据响应 SetDataResponse response = new SetDataResponse(); // 提交请求 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { // 判断返回码 if (r.getErr() == KeeperException.Code.NONODE.intValue()) { return null; } throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } // 返回结果的状态 return response.getStat().getCzxid() == -1 ? null : response.getStat();}
说明:该函数是同步的,用于判断指定路径的节点是否存在,值得注意的是,其会对指定路径的结点进行注册监听。- exists
函数签名:public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void exists(final String path, Watcher watcher,
{StatCallback cb, Object ctx)
final String clientPath = path;
// 验证路径是否合法
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) { // 生成存在性注册
}wcb = new ExistsWatchRegistration(watcher, clientPath);
// 添加根空间
final String serverPath = prependChroot(clientPath);
// 新生请求头
RequestHeader h = new RequestHeader();
// 设置请求头类型
h.setType(ZooDefs.OpCode.exists);
// 新生节点存在请求
ExistsRequest request = new ExistsRequest();
// 设置路径
request.setPath(serverPath);
// 设置Watcher
request.setWatch(watcher != null);
// 新生设置数据响应
SetDataResponse response = new SetDataResponse();
// 将请求封装成packet,放入队列,等待执行
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
}clientPath, serverPath, ctx, wcb);
说明:该函数是异步的,与同步的流程相似,不再累赘。
之后的getData、setData、getACL、setACL、getChildren函数均类似,只是生成的响应类别和监听类别不相同,大同小异,不再累赘。
三、总结
本篇博文分析了Watcher机制的ZooKeeper类,该类包括了对服务器的很多事务性操作,并且包含了同步和异步两个版本,但是相对来说,较为简单。