客户端的阻塞式等待 -- 自旋锁#
跟进submitRequest()
// todo 这是ClientCnxn的类, 提交请求, 最终将我们的请求传递到socket // todo 返回一个header, 因为根据它判断是否是否出错了 public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // todo 来到这个 queuePacket() 方法在下面, 这个方法就是将 用户输入-> string ->>> request ->>> packet 的过程 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); // todo 使用同步代码块,在下面的进行 同步阻塞等待, 直到有了Response响应才会跳出这个循环, 这个finished状态就是在客户端接受到服务端的 // todo 的响应后, 将服务端的响应解析出来,然后放置到 pendingqueue里时,设置上去的 synchronized (packet) { while (!packet.finished) { // todo 这个等待是需要唤醒的 packet.wait(); } } // todo 直到上面的代码块被唤醒,才会这个方法才会返回 return r; }
在上面的代码中,可以看到可以他是使用一个while(!packet,finishes){} 来阻塞程序的, 刚看看到用户的命令被封装进了request, 接下来, 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
中,可以看到他被封装进packet,然后添加到outgoingqueue队列中,源码如下
// todo Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. // todo 它会为我们的没有 Xid 的packet生成 Xid // It is generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // todo 她会在ClientCnxnSocket::doIO()之后生成 // where the packet is actually sent. // todo packet实际生成的位置 synchronized (outgoingQueue) { // todo 将用户传递过来的信息包装成 Packet packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing // todo 如果客户端正在发送关闭session的请求, 就标记成 closing = true if (h.getType() == OpCode.closeSession) { closing = true; } // todo 将packet 添加到队列里面 // todo 这个什么时候会被消费呢? 是在sendthread的无限循环中被消费的, 因为那是第二条线程 outgoingQueue.add(packet); } } // todo getClientCnxnSocket() 获取ClientCnxnSocket对象 // todo wakeupCnxn() 是 ClientCnxnSocket对象 中的抽象方法, 实现类是 ClientCnxnSocket的实现类ClientCnxnSocketNio // 唤醒阻塞在selector.select上的线程,让该线程及时去处理其他事情,比如这里的让sendThread 干净去消费packet sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
在这个方法的最后一行,点睛,selector.wakeup(); 就是通知选择器,别再阻塞select了,赶紧去做其他工作
因为选择器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代码贴出来如下
服务端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封装类的,SendThread同样也是,它可以使用
现在再看,唤醒selector 让他去做其他事 ,其实即使doIO(),这个方法代码其实我在上面贴出来过,就是分成两大部分,读就绪与写就绪
// todo 往服务端发送 packet //todo 下面就是NIO 网络编程的逻辑了 @Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { // todo 选择器在waitTimeOut时间内阻塞轮询== 上一次计算的 to时间 selector.select(waitTimeOut); Set<SelectionKey> selected; // todo 获取channel注册进selector时返回的key synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is non blocking, // so time is effectively a constant. That is Why we just have to do this once, here // todo 直到我们重新回到select之前, 下面的全部操作都是非阻塞的 // todo 因此时间只是一个常数, 那就是为什么我们在这里用下面的函数 updateNow(); // for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); // todo 建立连接的逻辑 if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // todo 往服务端发送数据的逻辑 , 方法在上面的64行 doIO(pendingQueue, outgoingQueue, cnxn); } }
写到这里其实已经把整个过程顺下来了,下面再重新看看,sendThread是如果消费packet并且修改然后得到服务端的响应,修改pakcet.finished属性的, 因为现在主线的submitRequest还在阻塞呢
往服务端写#
客户端的socket的实现类是ClientCnxnSocketNio
, 它往服务端写的逻辑如下, 不难看出使用的java原生的sock.write(p.bb); // 发送服务端
, 亮点是后面的操作pendingQueue.add(p);
被写过的packet被添加到了pengingqueue中
if (sockKey.isWritable()) { synchronized(outgoingQueue) { // todo 查询出可发送的packet Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } // todo 往服务端发送数据 packet.ByteBuf sock.write(p.bb); // 发送服务端 if (!p.bb.hasRemaining()) { //todo !hasRemaining 没有剩余的数据 sentCount++; // todo 将发送过的packet从outgoingqueue移除 outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { // todo 如果刚才的请求头的类型不是null , 不是ping ,不是权限验证 就把packet添加到 pendingQueue /** * These are the packets that have been sent and are waiting for a response. * todo 这个penddingQueue 存放的是已经发送的 和 等待服务器响应的packet */ pendingQueue.add(p); } } }
上面说了, 为啥被使用过的pakcet还要保留一份呢? 还是那个原因,主线程还因为pakcet的finish状态未被该变而阻塞呢, 那什么时候改变呢? 答案是受到服务端的响应之后改变,在哪里收到呢? 就是DoIo()的读就绪模块,下面附上源码,它的解析我写在这段代码下面
从服务端读#
if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { // todo 返回buffer incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { //todo 连接有没有初始化, 来这之前被改成了 flase ,现在 // todo 读取服务端发给我的连接请求的结果 readConnectResult(); // primeConnection() enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { //todo 如果已经初始化了, 就来这里读取响应, 跟进去 sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); }
如上代码的最后部分,sendThread.readResponse(incomingBuffer);
下面是它的源码,它首先是从buffer中读取出服务端的发送的数据,然后一通解析,封装进pendingqueue的packet中,并且在方法的最后部分终于完成了状态的修改
// todo 同样是 sendThread的方法, 读取响应 // todo 是经过flip 反转后的 可读的buffer void readResponse(ByteBuffer incomingBuffer) throws IOException { // todo --------------------- 从服务端写回来的buffer中解析封装成 ReplyHeader ---------------------------- ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); // todo --------------------------------------------------------------------- // todo 下面根据 ReplyHeader 的 xid 判断响应的结果类型 if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); return; } // If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. if (clientTunneledAuthenticationInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia,"token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } // todo 从pendingQueue 中取出第一个packet packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response to the first request! * // todo 因为请求存在队列中,是有顺序的, 因此我们最好对第一个做出相应 */ try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } // todo 把todo 从服务端解析出来的结果赋值给 pendingQueue 中的packet packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { // todo 跟进这个方法 finishPacket(packet); } }
解开客户端的阻塞状态#
进入finishPacket(packet)
// todo ClientCnxn 也就是本类中, 在根据用户的输入向服务端提交命令后的那个 wait唤醒了, finished=true,使得原来的while循环退出了 private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } // todo 唤醒 Zookeeper中 submitRequest() 提交请求后的阻塞操作, 现在拿到请求后进行唤醒 if (p.cb == null) { synchronized (p) { //todo 将这个finish 改成true, 在 p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }