ZooKeeper源码分析的版本:3.4.10。
一.创建ZooKeeper对象
ZooKeeper类是ZooKeeper客户端的实现,用来发送命令给ZooKeeper服务器。 ZooKeeper中可以设置Watcher,每个Watcher在节点状态发生变化的时候被通知,执行预先注册的Watcher动作。
ZooKeeper有三种Watcher列表: (1)DataWatcher (2)ExistWatcher (3)ChildWatcher.
ClientCnxn是客户端和服务端通信的底层接口,和ClientCnxnSocket一起工作提供网络通信服务。
protected final ClientCnxn cnxn;// 成员变量cnxn,连接服务器,通过cnxn发送命令给服务端
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
......// 打印log
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);// 从传入的服务器地址字符串中解析出服务器地址
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());// 提供服务器地址,当服务器发生故障无法连接时,会自动连接其它的服务器
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);// 构建和服务器通信的对象cnxn
cnxn.start();
}
二.create操作
调用create在ZooKeeper中创建一个Node,返回值是成功创建的路径名称: 首先看看 create 方法:
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);// 设置操作代码为create
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);// 使用输入参数构造CreateRequest请求
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
ReplyHeader r = cnxn.submitRequest(h, request, response, null);// 将请求提交发送给服务器
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();// 从返回的CreateResponse中获取创建成功后的路径
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
在 create 中通过 submitRequest 来提交请求:
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);// 将CreateRequest转换成Packet包
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
queuePacket 将CreateRequest转换成Packet包:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);// 将CreateRequest转换成Packet包
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);// 将发送包放入队列,等待发送线程发送给服务器
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
三.delete操作
删除节点操作,提供同步和异步两种接口方式:
public void delete(final String path, int version, VoidCallback cb,
Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);// 校验传入的路径是否合法
final String serverPath;
// maintain semantics even in chroot case
// specifically - root cannot be deleted
// I think this makes sense even in chroot case.
if (clientPath.equals("/")) {
// a bit of a hack, but delete(/) will never succeed and ensures
// that the same semantics are maintained
serverPath = clientPath;
} else {
serverPath = prependChroot(clientPath);
}
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.delete);// 设置操作代码为delete
DeleteRequest request = new DeleteRequest();
request.setPath(serverPath);// 使用输入参数构造DeleteRequest请求
request.setVersion(version);
cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
serverPath, ctx, null);// 和create操作一样,调用queuePacket方法,将DeleteRequest转换成Packet包
}