重理思路#
代码看到这里,重新调整一下思路接着往下看,首先作为服务端我们看到了上面的NIOServerCnxnFactory.java
类中的开启了本类维护的新线程,让服务端有了接收新连接的能力
既然是线程类,就存有Run方法,ZK的设计思路就是在NIOServerCnxnFactory.java
的run()方法中检测客户端有感兴趣的事件时,就进入DoIO()
从bytebuffer中将用户的请求解析出来,然后交由最后面的三个处理器排队处理
NIOServerCnxnFactory.java
的run方法部分代码如下
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // todo 接收数据,这里会间歇性的接收到客户端ping NIOServerCnxn c = (NIOServerCnxn) k.attachment(); // todo 跟进去, 和客户端的那一套很相似了 c.doIO(k); } else {
继续跟进readPayload()
-->readRequest()
-->zkServer.processPacket(this, incomingBuffer)
, 如下是processPacket()
方法的部分源码
else { // todo 将上面的信息包装成 request Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); // todo 提交request, 其实就是提交给服务端的 process处理器进行处理 submitRequest(si); }
继续跟进submitRequest()
,终于可以看到它尝试将这个request交给第一个处理器处理,但是因为这是在服务器启动的过程中,服务端并不确定服务器的第一个处理器线程到底有没有开启,因此它先验证,甚至会等一秒,直到处理器线程完成了启动的逻辑
// todo 交由服务器做出request的处理动作 public void submitRequest(Request si) { // todo 如果 firstProcessor 不存在,就报错了 if (firstProcessor == null) { synchronized (this) { try { while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); // todo 验证合法性 boolean validpacket = Request.isValid(si.type); if (validpacket) { // todo request合法的化,交给firstProcessor (实际是PrepRequestProcessor)处理 跟进去 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); }
经过上面的阅读,不难发现,最终来自于客户端的request都将会流经服务端的三个处理器,下面就看看它们到底做了哪些事
PrepRequestProcessor(线程类)#
因为他本身就是线程类,我们直接看他的run()
,最直接的可以看到,它将请求交给了pRequest(req)
处理
public void run() { try { while (true) { // todo 取出请求 Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; //todo 处理请求 if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } // todo 着重看这里, 跟进去 pRequest(request); }
下面跟进它的pRequest()
,下面是它的源码,通过switch分支针对不同类型的请求做出不同的处理,下面用create类型的请求举例
protected void pRequest(Request request) throws RequestProcessorException { // LOG.info("Prep>>> cxid = " + request.cxid + " type = " + // request.type + " id = 0x" + Long.toHexString(request.sessionId)); request.hdr = null; request.txn = null; // todo 下面的不同类型的信息, 对应这不同的处理器方式 try { switch (request.type) { case OpCode.create: // todo 创建每条记录对应的bean , 现在还是空的, 在面的pRequest2Txn 完成赋值 CreateRequest createRequest = new CreateRequest(); // todo 跟进这个方法, 再从这个方法出来,往下运行,可以看到调用了下一个处理器 pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true); break; . . . request.zxid = zks.getZxid(); // todo 调用下一个处理器处理器请求 SyncRequestProcessor nextProcessor.processRequest(request);
总览思路,现在当前的处理器进行状态的相关处理,处理完之后移交给下一个处理器
跟进pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
依然是用create类型距离, 它在下面的方法中做了如下几件事
- 因为create是事务类型的请求,它在一开始就给request构建了事务头 txnHeader
- 将request中的属性反序列化进
CreateRequest
类中 - 校验一下权限,检查一下访问时是否需要访问权限,如果需要,当前访问者有没有足够的权限
- 根据用户想create新node而输入的string,进行截取取出它的父级路径,因为创建新节点时,需在修改父路径对应节点的相关信息
- 校验父节点是否是临时节点
- 修改父节点是属性
- 更新zxid(创建znode事务id)
- childCount++
- 更新cversion(针对当前子节点修改的次数)
- 将这条记录添加到
outstandingChanges
集合中
// todo 第二个参数位置上的 record 是上一步new 出来的空对象--> protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { // todo 使用request的相关属性,创建出 事务Header request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { case OpCode.create: // todo 校验session的情况 zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CreateRequest createRequest = (CreateRequest)record; if(deserialize) // todo 反序列化 ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); // todo 获取出request中的path String path = createRequest.getPath(); int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) { LOG.info("Invalid path " + path + " with session 0x" + Long.toHexString(request.sessionId)); throw new KeeperException.BadArgumentsException(path); } // todo 进行权限的验证 List<ACL> listACL = removeDuplicates(createRequest.getAcl()); if (!fixupACL(request.authInfo, listACL)) { throw new KeeperException.InvalidACLException(path); } // todo 获取父级路径 String parentPath = path.substring(0, lastSlash); // todo 跟进这个方法, 跟进父节点的路径找到 parentRecord ChangeRecord parentRecord = getRecordForPath(parentPath); // todo 校验 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); // todo 取出父节点的C version (子节点的version) int parentCVersion = parentRecord.stat.getCversion(); CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } // todo 判断当前的父节点 是不是临时节点 boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0; if (ephemeralParent) { // todo 父节点如果是临时节点, 直接抛异常结束 throw new KeeperException.NoChildrenForEphemeralsException(path); } // todo 父节点不是临时节点, 将创建的节点的VCersion 就是在父节点的基础上+1 int newCversion = parentRecord.stat.getCversion()+1; request.txn = new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(), newCversion); StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } // todo 修改了父节点的一些元信息 parentRecord = parentRecord.duplicate(request.hdr.getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); //todo 添加两条修改记录 addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL)); break;
SyncRequestProcessor(线程类)#
一个create请求经过第一个处理器进行状态相关的处理之后,就来到当前这个第二个处理器, 当前处理器的主要作用就是负责同步持久化,将request持久化到磁盘,人们说的打快照,也就是将DataTree序列化后持久化的工作,他的主要逻辑都在下面的Run方法中
- 首先是
while(true)
保证了作为线程类的它可以无休止的一直运行下去 - 尝试从队列中取出request
- 队列为空,阻塞等待,直接不为空取出req再处理
- 队列不为空,直接取出一个req,接着处理
- 请求被取出来之后通过
if-else
分支进行不同的处理
- 如果是事务类型的
- 非事务类型的request
public void run() { try { // todo 写日志的初始数量 int logCount = 0; // we do this in an attempt to ensure that not all of the serversin the ensemble take a snapshot at the same time // todo 设置RandRoll的大小, 确保所有服务器在同一个时间不使用同一个快照 setRandRoll(r.nextInt(snapCount / 2)); //todo 这个处理器拥有自己的无限循环 while (true) { // todo 初始请求为null Request si = null; // todo toFlush是一个LinkedList, 里面存放着需要 持久化到磁盘中的request if (toFlush.isEmpty()) { // todo 没有需要刷新进disk的 // todo 这个take()是LinkedList原生的方法 // todo 从请求队列中取出一个请求,如果队列为空就会阻塞在这里 si = queuedRequests.take(); } else { // todo 如果队列为空,直接取出request, 并不会阻塞 si = queuedRequests.poll(); if (si == null) { //todo 刷新进磁盘 flush(toFlush); continue; } } // todo 在关闭处理器之前,会添加requestOfDeadth,表示关闭后不再接收任何请求 if (si == requestOfDeath) { break; } //todo 成功的从队列中取出了请求 if (si != null) { // track the number of records written to the log // todo 将request 追加到日志文件, 只有事物性的请求才会返回true if (zks.getZKDatabase().append(si)) { // todo 刚才的事物日志放到请求成功后,添加一次, log数+1 logCount++; // todo 当持久化的request数量 > (快照数/2 +randRoll) 时, 创建新的日志文件 if (logCount > (snapCount / 2 + randRoll)) { setRandRoll(r.nextInt(snapCount / 2)); // todo roll the log // todo 跟进去这个方法, 最终也会执行 this.logStream.flush(); // todo 新生成一个日志文件 // todo 调用rollLog函数翻转日志文件 zks.getZKDatabase().rollLog(); // todo 拍摄日志快照 if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { // todo 创建线程处理快照 snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { // todo 打快照, 跟进去 zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } } }; // todo 开启快照线程 snapInProcess.start(); } // todo 重置为0 logCount = 0; } } else if (toFlush.isEmpty()) { // todo 如果等待被刷新进disk的request为空 // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor // todo 查看此时toFlush是否为空,如果为空,说明近段时间读多写少,直接响应 if (nextProcessor != null) { // todo 最终也会调用 nextProcessor 处理request FinalRequestProcess nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } // todo 流里面的内容不了立即刷新, 调用 toFlush.add(si); 累积request toFlush.add(si); if (toFlush.size() > 1000) { // todo 当toFlush中的 request数量 > 1000 将会flush flush(toFlush); } } }
究竟是不是 事务类型的req,是在上面的代码中的
zks.getZKDatabase().append(si)
实现的,true表示属于事务类型,跟进这个方法,最终回来到FileTxnLog.java
的append()
,源码如下
代码是挺长的,但是逻辑也算是清楚,如下
- 根据有没有request的头,判断是否是事务类型,对于查询一类的非实物类型的请求来说,直接返回false退出,也不用往日志文件中添加什么信息,事实上确实如此,就直接进入非事务类型的req,也可以看到
continue
没有一点持久化到磁盘的逻辑 - 其他类型的会对服务端的数据状态造成改变的事务性请求,会在这里被持久化进logDir中的日志文件,,还有个细节第一次的事务类型的请求会在这里完成持久化进磁盘的操作,除了第一次之外,其他的都会被批处理,原酒就是下面代码中的这一行
if (logStream==null) {
- 满足这个条件
if (logCount > (snapCount / 2 + randRoll))
之后,就会进行一次日志文件的滚动,说白了,就是现在的日志文件体积太大了,然后得保存原来的就日志文件,创建一个新的空的日志文件继续使用 - 打快照, 实际上就是将内存中的DataBase序列化后持久保存进内存中,这样做对数据的恢复是很有帮助的,比如集群的Follower可以通过Leader的快照迅速完成数据的同步
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr == null) { return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } else { lastZxidSeen = hdr.getZxid(); } // todo 第一次来==null。 再执行过来就不进来了,等着在 SyncRequestProcessor中批量处理 // todo logStream == BufferedOutputStream if (logStream==null) { if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid())); } // todo 关联上 我们指定的logdir位置的日志文件 logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); // todo 包装进文件输出流 fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. logStream.flush(); filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); } filePadding.padFile(fos.getChannel()); byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf); return true; }
FinalRequestProcessor#
终于来到了FinalRequestProcessor
处理器,它并不是线程类,但是它确实是和前两个线程类并列的,单机模式下最后一个处理器类
它处理request的逻辑那是相当长我挑着贴在下面,只是关注下面的几个点,代码并不完整哦
它的解释我写在源码的下面
public void processRequest(Request request) { ProcessTxnResult rc = null; // 看一看!!!!!!!!! // 看一看!!!!!!!!! // 看一看!!!!!!!!! // 它在消费 outstandingChanges 队列, 没错,这个队列中对象, 就是第一个个处理器调用addChange()方法添加进去的record // 看一看!!!!!!!!! // 看一看!!!!!!!!! // 看一看!!!!!!!!! synchronized (zks.outstandingChanges) { // todo outstandingChanges不为空且首个元素的zxid小于等于请求的zxid while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.get(0).zxid <= request.zxid) { //todo 移除并返回第一个元素 ChangeRecord cr = zks.outstandingChanges.remove(0); // todo 如果record的zxid < request.zxid 警告 if (cr.zxid < request.zxid) { LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + request.zxid); } // todo 根据路径得到Record并判断是否为cr if (zks.outstandingChangesForPath.get(cr.path) == cr) { // 移除cr的路径对应的记录 zks.outstandingChangesForPath.remove(cr.path); } } //todo 请求头不为空 if (request.hdr != null) { // 获取请求头 TxnHeader hdr = request.hdr; // 获取事务 Record txn = request.txn; // todo 跟进这个方法-----<--!!!!!!-----处理事务的逻辑,在这里面有向客户端发送事件的逻辑, 回调客户端的watcher----!!!!!!--> // todo 在这个方法里面更新了内存 rc = zks.processTxn(hdr, txn); } // do not add non quorum packets to the queue. // todo 只将quorum包(事务性请求)添加进队列 if (Request.isQuorum(request.type)) { zks.getZKDatabase().addCommittedProposal(request); } } if (request.cnxn == null) { return; } ServerCnxn cnxn = request.cnxn; String lastOp = "NA"; zks.decInProcess(); Code err = Code.OK; Record rsp = null; boolean closeSession = false; // todo 根据请求头的不同类型进行不同的处理 switch (request.type) { //todo PING case OpCode.ping: { //todo 更新延迟 zks.serverStats().updateLatency(request.createTime); lastOp = "PING"; //todo 更新响应的状态 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, Time.currentElapsedTime()); cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); return; } . . . // todo 如果是create , 在这里返回给客户端 结果 case OpCode.create: { lastOp = "CREA"; rsp = new CreateResponse(rc.path); // todo 在下面代码的最后 返回出去 rsp err = Code.get(rc.err); break; } long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); zks.serverStats().updateLatency(request.createTime); cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime()); // todo 在这里将向客户端返回信息, 跟进去查看就能看到socket相关的内容 cnxn.sendResponse(hdr, rsp, "response");
- 第一点,更新内存在内存DataTree中创建新的节点,回调watcher
rc = zks.processTxn(hdr, txn);
- 第二点响应客户端
cnxn.sendResponse(hdr, rsp, "response");