zookeeper - client介绍(12)

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 概述 先打一个预告,写完这篇zookeeper-client端的文章以后,基本上zookeeper的系列已经完结了,后面会进入flume系列的源码阅读当中,敬请期待,而这篇文章也是上周去听分享时候砸场子答应别人写的文章。

概述

 先打一个预告,写完这篇zookeeper-client端的文章以后,基本上zookeeper的系列已经完结了,后面会进入flume系列的源码阅读当中,敬请期待,而这篇文章也是上周去听分享时候砸场子答应别人写的文章。
 zookeeper-client端主要介绍的内容包括client的核心属性,client-server的交互过程(以getData作为一个例子),client-watcher的触发过程,client的重连过程。至于client-server建立连接的过程,建议看下《zookeeper - 主从通信(3)》。


zookeeper-client

zookeeper-client核心属性如下

  • ZKWatchManager:用于client端watcher注册的核心数据结构
  • ExistsWatchRegistration:用于exists方法注册的watcher事件
  • DataWatchRegistration:用于getData方法注册的watcher事件
  • ChildWatchRegistration:用于getChildren方法注册watcher事件
  • ClientCnxn:用于维持client和server端的连接

ClientCnxn的核心属性

  • SendThread:用于处理client和server的读写操作
  • EventThread:用于处理server端发回来的event事件
  • outgoingQueue:用于保存待发送的packet的queue
  • pendingQueue:用于保存已发送的packet的queue并用来业务处理


sendThread介绍

 sendThread主要用于client-server的发送和接收的交互过程,这里我们以getData作为一个例子进行分析,通过getData的分析基本上我们能够了解清楚client的发送过程。
 核心点:1、创建DataWatchRegistration对象,后面会注册到ZKWatchManager当中;2、通过queuePacket发送发送报文。

public void getData(final String path, Watcher watcher,
            DataCallback cb, Object ctx)
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            //todo 核心点在于我们把watcher注册进去了
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                clientPath, serverPath, ctx, wcb);
    }



 queuePacket动作负责将packet加入到outgoingQueue当中,然后通过packetAdded方法唤醒sendThread发送packet。

  public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration,
            WatchDeregistration watchDeregistration) {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of         ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        //todo 这里的Packet包含watchRegistration对象
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        // The synchronized block here is for two purpose:
        // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
        // 2. synchronized against each packet. So if a closeSession packet is added,
        // later packet will be notified.
        synchronized (state) {
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                //todo 核心点在于每次发送报文的时候放置到outgoingQueue
                outgoingQueue.add(packet);
            }
        }
        //todo 这里负责唤醒sendThread线程进行send操作
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    }



 SendThread的核心run代码,真正执行的过程在doTransport当中,这里可以看到我们传进去的pendingQueue,用于每次从outgoingQueue取的报文发送的同时保存到pendingQueue便于处理server发回的响应,所有的核心操作都在doTransport的方法当中。

public void run() {
            while (state.isAlive()) {
                try {
                    //todo 处理没连接的情况就开始执行连接
                    if (!clientCnxnSocket.isConnected()) {
                    }

                    //todo 处理已经连接的情况
                    if (state.isConnected()) {
                    }

                    //todo 处理已经连接的情况
                    if (state.isConnected()) {
                    }

                    // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {
                    }

                    //todo 真正核心的逻辑代码,处理连接事件
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
            }



 doTransport中通过selector操作进行处理各种nio事件,我们重点关注SelectionKey.OP_READ | SelectionKey.OP_WRITE事件,进入doIO处理操作。

void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
            throws IOException, InterruptedException {

        //todo 通过selector的监听获取数据
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        
        //todo 这里在更新时间
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());

            //todo 如果是处理连接事件,异步连接成功
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                //todo 处理读写事件
                doIO(pendingQueue, cnxn);
            }
        }

        if (sendThread.getZkState().isConnected()) {
            if (findSendablePacket(outgoingQueue,
                    sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }

        selected.clear();
    }



 doIO过程主要包括read/write两个过程,write的过程从outgoingQueue中获取一个packet发送并保存到pendingQueue当中;read的过程先读取报文的长度再读取报文内容,然后进行sendThread.readResponse处理。

void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }

        //todo 处理读事件
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }

            //todo 这里肯定不会有空的,因为我们按照长度去读取的,每次先读取长度,然后可能是初始化的逻辑,可能是实际报文的内容
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                //todo 这里设计如果相等说明我们读取的是长度
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) {
                    //todo 说明还没有初始化,这个地方也是也是先读取长度的
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue,
                            sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    //todo 读取实际长度
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }

        //todo 处理写事件
        if (sockKey.isWritable()) {
            Packet p = findSendablePacket(outgoingQueue,
                    sendThread.tunnelAuthInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != OpCode.ping) &&
                            (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }

                    //todo 创建写报文
                    p.createBB();
                }
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount++;
                    outgoingQueue.removeFirstOccurrence(p);

                    //todo 如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了
                    if (p.requestHeader != null
                            && p.requestHeader.getType() != OpCode.ping
                            && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
                disableWrite();
            } else {
                // Just in case
                enableWrite();
            }
        }
    }



 readResponse的处理过程当中,负责处理ping报文、AuthPacket报文、event事件、数据报文,这里着重关注报文处理和事件处理的两个过程,其中报文处理我们跟踪finishPacket方法,针对事件处理我们关注event的反序列化过程以及通过eventThread.queueEvent过程加入到eventThread当中。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");

            //todo 这里处理是pings的响应
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                return;
            }

            //todo 这里处理AuthPacket报文
            if (replyHdr.getXid() == -4) {           
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) );                                      
                }
                return;
            }

            //todo 这里触发的应该就是event事件的到来
            if (replyHdr.getXid() == -1) {
                // -1 means notification
               //todo 我们从收到的报文中解压出WatcherEvent对象
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                //todo WatchedEvent这里根据event重新构建了要给event放入到eventThread队列中
                WatchedEvent we = new WatchedEvent(event);
                eventThread.queueEvent( we );
                return;
            }

            if (tunnelAuthInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia,"token");
                zooKeeperSaslClient.respondToServer(request.getToken(),
                  ClientCnxn.this);
                return;
            }

            //todo 这里在处理报文的响应
            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                //todo 这里我们看到了从pendingQueue取已发送的报文
                packet = pendingQueue.remove();
            }
            try {
                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
            } finally {
                //todo 继续处理报文格式
                finishPacket(packet);
            }
        }



 报文处理的后续后续处理步骤,这里有一个核心步骤p.watchRegistration.register,这里会把我们通过getData方法传入的watcher对象注册到ZKWatchManager当中。

private void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        //todo 这里根据server端返回的错误码将watcher添加到watchRegistration当中
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        
        if (p.watchDeregistration != null) {
            Map<EventType, Set<Watcher>> materializedWatchers = null;
            try {
                materializedWatchers = p.watchDeregistration.unregister(err);
                for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
                        .entrySet()) {
                    Set<Watcher> watchers = entry.getValue();
                    if (watchers.size() > 0) {
                        queueEvent(p.watchDeregistration.getClientPath(), err,
                                watchers, entry.getKey());
                        // ignore connectionloss when removing from local
                        // session
                        p.replyHeader.setErr(Code.OK.intValue());
                    }
                }
            } catch (KeeperException.NoWatcherException nwe) {
                p.replyHeader.setErr(nwe.code().intValue());
            } catch (KeeperException ke) {
                p.replyHeader.setErr(ke.code().intValue());
            }
        }

        //todo 异步AsyncCallback
        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            //todo 异步创建节点时候的回调机制
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }



 register的过程中的getWatches会从ExistsWatchRegistration、DataWatchRegistration、ChildWatchRegistration当中,watches就是不同的Registration保存的watcher对象。eventThread当中取得watcher然后执行回调。

public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }



 eventThread.queueEvent的过程,这里我们走的是materialized方法中Watchers == null的分支,这里我们需要进一步关注materialize方法。

public void queueEvent(WatchedEvent event) {
            queueEvent(event, null);
        }

        private void queueEvent(WatchedEvent event,
                Set<Watcher> materializedWatchers) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // materialize the watchers based on the event
                //todo 这里核心走的分支,我们分析只走这个分支
                watchers = watcher.materialize(event.getState(),
                        event.getType(), event.getPath());
            } else {
                watchers = new HashSet<Watcher>();
                watchers.addAll(materializedWatchers);
            }
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }



 materialize的过程中我们以NodeDataChanged事件为例,我们把在finishPackage中注册的watcher(就是保存在dataWatches数据结构中),保存到result结果当中返回并触发这些watcher。

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {
            case None:
                result.add(defaultWatcher);
                boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }

                return result;
            case NodeDataChanged:
            case NodeCreated:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(list, result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            return result;
        }



 watcherEvent的数据结构,、属性中只包括type、state、path,不包括具体的数据,不包括具体的数据,不包括具体的数据,重要的事情说三遍哦。

public class WatcherEvent implements Record {
  private int type;
  private int state;
  private String path;
  public WatcherEvent() {
  }
  public WatcherEvent(
        int type,
        int state,
        String path) {
    this.type=type;
    this.state=state;
    this.path=path;
  }
  public int getType() {
    return type;
  }
  public void setType(int m_) {
    type=m_;
  }
  public int getState() {
    return state;
  }
  public void setState(int m_) {
    state=m_;
  }
  public String getPath() {
    return path;
  }
  public void setPath(String m_) {
    path=m_;
  }
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeInt(type,"type");
    a_.writeInt(state,"state");
    a_.writeString(path,"path");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    type=a_.readInt("type");
    state=a_.readInt("state");
    path=a_.readString("path");
    a_.endRecord(tag);
}


eventTread介绍

 处理事件的入口,eventThread负责处理在sendThread中往waitingEvents队列当中放置的event事件。

public void run() {
           try {
              isRunning = true;
              while (true) {
                  //todo 从队列中取出待处理事件报文
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    //todo 处理事件报文
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } 



 event处理过程processEvent主要处理watcher、处理异步处理事件、处理response报文,处理watcher事件的最后代码watcher.process(pair.event),这里就已经执行到了各个watcher的回调代码中了。

private void processEvent(Object event) {
          try {
              if (event instanceof WatcherSetEventPair) {
                  //todo 处理watch事件
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
                } else if (event instanceof LocalCallback) {

                    //todo,处理异步处理回调
                    LocalCallback lcb = (LocalCallback) event;
                    if (lcb.cb instanceof StatCallback) {
                        ((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path,
                                lcb.ctx, null);
                    } else if (lcb.cb instanceof DataCallback) {
                        ((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path,
                                lcb.ctx, null, null);
                    } else if (lcb.cb instanceof ACLCallback) {
                        ((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path,
                                lcb.ctx, null, null);
                    } else if (lcb.cb instanceof ChildrenCallback) {
                        ((ChildrenCallback) lcb.cb).processResult(lcb.rc,
                                lcb.path, lcb.ctx, null);
                    } else if (lcb.cb instanceof Children2Callback) {
                        ((Children2Callback) lcb.cb).processResult(lcb.rc,
                                lcb.path, lcb.ctx, null, null);
                    } else if (lcb.cb instanceof StringCallback) {
                        ((StringCallback) lcb.cb).processResult(lcb.rc,
                                lcb.path, lcb.ctx, null);
                    } else {
                        ((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path,
                                lcb.ctx);
                    }
                } else {
                  //todo 处理响应报文
                  Packet p = (Packet) event;
                  int rc = 0;
                  String clientPath = p.clientPath;
                  if (p.replyHeader.getErr() != 0) {
                      rc = p.replyHeader.getErr();
                  }
                  if (p.cb == null) {
                      LOG.warn("Somehow a null cb got to EventThread!");
                  } else if (p.response instanceof ExistsResponse
                          || p.response instanceof SetDataResponse
                          || p.response instanceof SetACLResponse) {
                      StatCallback cb = (StatCallback) p.cb;
                      if (rc == 0) {
                          if (p.response instanceof ExistsResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((ExistsResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetDataResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetDataResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetACLResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetACLResponse) p.response)
                                              .getStat());
                          }
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetDataResponse) {
                      DataCallback cb = (DataCallback) p.cb;
                      GetDataResponse rsp = (GetDataResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getData(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetACLResponse) {
                      ACLCallback cb = (ACLCallback) p.cb;
                      GetACLResponse rsp = (GetACLResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getAcl(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetChildrenResponse) {
                      ChildrenCallback cb = (ChildrenCallback) p.cb;
                      GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetChildren2Response) {
                      Children2Callback cb = (Children2Callback) p.cb;
                      GetChildren2Response rsp = (GetChildren2Response) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null, null);
                      }
                  } else if (p.response instanceof CreateResponse) {
                      StringCallback cb = (StringCallback) p.cb;
                      CreateResponse rsp = (CreateResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx,
                                  (chrootPath == null
                                          ? rsp.getPath()
                                          : rsp.getPath()
                                    .substring(chrootPath.length())));
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof Create2Response) {
                      Create2Callback cb = (Create2Callback) p.cb;
                      Create2Response rsp = (Create2Response) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx,
                                  (chrootPath == null
                                          ? rsp.getPath()
                                          : rsp.getPath()
                                    .substring(chrootPath.length())), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null, null);
                      }                   
                  } else if (p.response instanceof MultiResponse) {
                      MultiCallback cb = (MultiCallback) p.cb;
                      MultiResponse rsp = (MultiResponse) p.response;
                      if (rc == 0) {
                          List<OpResult> results = rsp.getResultList();
                          int newRc = rc;
                          for (OpResult result : results) {
                              if (result instanceof ErrorResult
                                      && KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result)
                                      .getErr())) {
                                  break;
                              }
                          }
                          cb.processResult(newRc, clientPath, p.ctx, results);
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  }  else if (p.cb instanceof VoidCallback) {
                      VoidCallback cb = (VoidCallback) p.cb;
                      cb.processResult(rc, clientPath, p.ctx);
                  }
              }
          } catch (Throwable t) {
              LOG.error("Caught unexpected throwable", t);
          }
       }


client重连介绍

 从这里可以看出来,基本上我们通过next获取一个地址进行重连。

public void run() {
           
            while (state.isAlive()) {
                try {

                    //todo 处理没连接的情况就开始执行连接
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            serverAddress = hostProvider.next(1000);
                        }

                        //todo 开始建立正式连接,核心知识点
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
        }
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
分布式计算 Java Hadoop
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
|
网络协议 Go
zookeeper go client原理总结
zookeeper go client原理总结
353 0
zookeeper go client原理总结
Zookeeper之Zookeeper的Client的分析
1)几个重要概念  ZooKeeper:客户端入口 Watcher:客户端注册的callback ZooKeeper.SendThread: IO线程 ZooKeeper.EventThread: 事件处理线程,处理各类消息callback ClientCnxnSocketNIO...
1190 0
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
3月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
|
5月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
2月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
45 2
下一篇
无影云桌面