概述
先打一个预告,写完这篇zookeeper-client端的文章以后,基本上zookeeper的系列已经完结了,后面会进入flume系列的源码阅读当中,敬请期待,而这篇文章也是上周去听分享时候砸场子答应别人写的文章。
zookeeper-client端主要介绍的内容包括client的核心属性,client-server的交互过程(以getData作为一个例子),client-watcher的触发过程,client的重连过程。至于client-server建立连接的过程,建议看下《zookeeper - 主从通信(3)》。
zookeeper-client
zookeeper-client核心属性如下
- ZKWatchManager:用于client端watcher注册的核心数据结构
- ExistsWatchRegistration:用于exists方法注册的watcher事件
- DataWatchRegistration:用于getData方法注册的watcher事件
- ChildWatchRegistration:用于getChildren方法注册watcher事件
- ClientCnxn:用于维持client和server端的连接
ClientCnxn的核心属性
- SendThread:用于处理client和server的读写操作
- EventThread:用于处理server端发回来的event事件
- outgoingQueue:用于保存待发送的packet的queue
- pendingQueue:用于保存已发送的packet的queue并用来业务处理
sendThread介绍
sendThread主要用于client-server的发送和接收的交互过程,这里我们以getData作为一个例子进行分析,通过getData的分析基本上我们能够了解清楚client的发送过程。
核心点:1、创建DataWatchRegistration对象,后面会注册到ZKWatchManager当中;2、通过queuePacket发送发送报文。
public void getData(final String path, Watcher watcher,
DataCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
//todo 核心点在于我们把watcher注册进去了
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}
queuePacket动作负责将packet加入到outgoingQueue当中,然后通过packetAdded方法唤醒sendThread发送packet。
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
//todo 这里的Packet包含watchRegistration对象
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//todo 核心点在于每次发送报文的时候放置到outgoingQueue
outgoingQueue.add(packet);
}
}
//todo 这里负责唤醒sendThread线程进行send操作
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
SendThread的核心run代码,真正执行的过程在doTransport当中,这里可以看到我们传进去的pendingQueue,用于每次从outgoingQueue取的报文发送的同时保存到pendingQueue便于处理server发回的响应,所有的核心操作都在doTransport的方法当中。
public void run() {
while (state.isAlive()) {
try {
//todo 处理没连接的情况就开始执行连接
if (!clientCnxnSocket.isConnected()) {
}
//todo 处理已经连接的情况
if (state.isConnected()) {
}
//todo 处理已经连接的情况
if (state.isConnected()) {
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
}
//todo 真正核心的逻辑代码,处理连接事件
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
}
doTransport中通过selector操作进行处理各种nio事件,我们重点关注SelectionKey.OP_READ | SelectionKey.OP_WRITE事件,进入doIO处理操作。
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
//todo 通过selector的监听获取数据
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
//todo 这里在更新时间
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
//todo 如果是处理连接事件,异步连接成功
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//todo 处理读写事件
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
doIO过程主要包括read/write两个过程,write的过程从outgoingQueue中获取一个packet发送并保存到pendingQueue当中;read的过程先读取报文的长度再读取报文内容,然后进行sendThread.readResponse处理。
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//todo 处理读事件
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
//todo 这里肯定不会有空的,因为我们按照长度去读取的,每次先读取长度,然后可能是初始化的逻辑,可能是实际报文的内容
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
//todo 这里设计如果相等说明我们读取的是长度
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
//todo 说明还没有初始化,这个地方也是也是先读取长度的
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//todo 读取实际长度
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
//todo 处理写事件
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
//todo 创建写报文
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
//todo 如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
readResponse的处理过程当中,负责处理ping报文、AuthPacket报文、event事件、数据报文,这里着重关注报文处理和事件处理的两个过程,其中报文处理我们跟踪finishPacket方法,针对事件处理我们关注event的反序列化过程以及通过eventThread.queueEvent过程加入到eventThread当中。
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
//todo 这里处理是pings的响应
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
return;
}
//todo 这里处理AuthPacket报文
if (replyHdr.getXid() == -4) {
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
return;
}
//todo 这里触发的应该就是event事件的到来
if (replyHdr.getXid() == -1) {
// -1 means notification
//todo 我们从收到的报文中解压出WatcherEvent对象
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
//todo WatchedEvent这里根据event重新构建了要给event放入到eventThread队列中
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
return;
}
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
//todo 这里在处理报文的响应
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
//todo 这里我们看到了从pendingQueue取已发送的报文
packet = pendingQueue.remove();
}
try {
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
} finally {
//todo 继续处理报文格式
finishPacket(packet);
}
}
报文处理的后续后续处理步骤,这里有一个核心步骤p.watchRegistration.register,这里会把我们通过getData方法传入的watcher对象注册到ZKWatchManager当中。
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
//todo 这里根据server端返回的错误码将watcher添加到watchRegistration当中
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err,
watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
}
//todo 异步AsyncCallback
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
//todo 异步创建节点时候的回调机制
p.finished = true;
eventThread.queuePacket(p);
}
}
register的过程中的getWatches会从ExistsWatchRegistration、DataWatchRegistration、ChildWatchRegistration当中,watches就是不同的Registration保存的watcher对象。eventThread当中取得watcher然后执行回调。
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
eventThread.queueEvent的过程,这里我们走的是materialized方法中Watchers == null的分支,这里我们需要进一步关注materialize方法。
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event,
Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
//todo 这里核心走的分支,我们分析只走这个分支
watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
materialize的过程中我们以NodeDataChanged事件为例,我们把在finishPackage中注册的watcher(就是保存在dataWatches数据结构中),保存到result结果当中返回并触发这些watcher。
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
watcherEvent的数据结构,、属性中只包括type、state、path,不包括具体的数据,不包括具体的数据,不包括具体的数据,重要的事情说三遍哦。
public class WatcherEvent implements Record {
private int type;
private int state;
private String path;
public WatcherEvent() {
}
public WatcherEvent(
int type,
int state,
String path) {
this.type=type;
this.state=state;
this.path=path;
}
public int getType() {
return type;
}
public void setType(int m_) {
type=m_;
}
public int getState() {
return state;
}
public void setState(int m_) {
state=m_;
}
public String getPath() {
return path;
}
public void setPath(String m_) {
path=m_;
}
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(type,"type");
a_.writeInt(state,"state");
a_.writeString(path,"path");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
type=a_.readInt("type");
state=a_.readInt("state");
path=a_.readString("path");
a_.endRecord(tag);
}
eventTread介绍
处理事件的入口,eventThread负责处理在sendThread中往waitingEvents队列当中放置的event事件。
public void run() {
try {
isRunning = true;
while (true) {
//todo 从队列中取出待处理事件报文
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
//todo 处理事件报文
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
event处理过程processEvent主要处理watcher、处理异步处理事件、处理response报文,处理watcher事件的最后代码watcher.process(pair.event),这里就已经执行到了各个watcher的回调代码中了。
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
//todo 处理watch事件
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else if (event instanceof LocalCallback) {
//todo,处理异步处理回调
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null);
} else if (lcb.cb instanceof DataCallback) {
((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null, null);
} else if (lcb.cb instanceof ACLCallback) {
((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null, null);
} else if (lcb.cb instanceof ChildrenCallback) {
((ChildrenCallback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof Children2Callback) {
((Children2Callback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof StringCallback) {
((StringCallback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null);
} else {
((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx);
}
} else {
//todo 处理响应报文
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof Create2Response) {
Create2Callback cb = (Create2Callback) p.cb;
Create2Response rsp = (Create2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
if (rc == 0) {
List<OpResult> results = rsp.getResultList();
int newRc = rc;
for (OpResult result : results) {
if (result instanceof ErrorResult
&& KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result)
.getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
client重连介绍
从这里可以看出来,基本上我们通过next获取一个地址进行重连。
public void run() {
while (state.isAlive()) {
try {
//todo 处理没连接的情况就开始执行连接
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
//todo 开始建立正式连接,核心知识点
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
}