深入理解 ZooKeeper单机客户端的启动流程(一)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 2020-0208 补充整个过程的流程图

2020-0208 补充整个过程的流程图



客户端的启动流程#



看上面的客户端启动的脚本图,可以看到,zookeeper客户端脚本运行的入口ZookeeperMain.java的main()方法, 关于这个类可以理解成它是程序启动的辅助类,由它提供开始的位置,进而加载出zk client的上下文


创建ZooKeeperMain对象#



// todo zookeeper的入口方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
    // todo new ZK客户端
    ZooKeeperMain main = new ZooKeeperMain(args);
    // todo run方法的实现在下面
    main.run();
}


跟踪ZooKeeperMain main = new ZooKeeperMain(args); 能往下追很长的代码,提前说main.run()的作用,就是对用户输入的命令进行下一步处理


如上是入口函数的位置,跟进这两个函数,可以找到我们在client端的命令行中可以输入命令和zookeeper服务端进行通信的原因(开起了新的线程),以及zookeeper的客户端所依赖的其他类


跟进ZooKeeperMain main = new ZooKeeperMain(args);


public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 连接到客户端
    connectToZK(cl.getOption("server"));
    }


我们在命令行启动客户端时,输入命令zkCli.sh -server localhost:2181,其中的args数组, 就是我们在启动就是我们输入的参数,


构建zookeeperMain对象时,上面主要做了两件事


  • 解析args参数数组
  • 连接客户端


解析参数数组的逻辑就在下面, 很熟悉,就是我们在命令行启动zookeeper时输入的命令可选项


public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();
    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }
        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}


创建ZooKeeper客户端的对象#



接着看如果连接客户端, connectToZK(String newHost) 同样是本类方法,源码如下:


// todo 来到这里
protected void connectToZK(String newHost) throws InterruptedException, IOException {
    if (zk != null && zk.getState().isAlive()) {
        zk.close();
    }
    //todo  命令行中的server 后面跟着 host主机地址
    host = newHost;
    boolean readOnly = cl.getOption("readonly") != null;
    // todo 创建zookeeper的实例
    zk = new ZooKeeper(host,
                        Integer.parseInt(cl.getOption("timeout")),
                        new MyWatcher(), readOnly);
}


到这里算是个小高潮吧,毕竟看到了zookeeper client的封装类ZooKeeper, 这个类上的注解大概是这么介绍这个类的

  • 它是个Zookeeper 客户端的封装类, 它的第一个参数是 host:port,host:port,host:port这种格式的字符串,逗号左右是不同的服务端的地址
  • 会异步的创建session,通常这个session在构造函数执行完之间就已经创建完成了
  • watcher 是监听者,它被通知的时刻不确定,可能是构造方法执行完成前,也可能在这之后
  • 只要没有连接成功, zookeeper客户端,会一直尝试从提供的服务地址串中选择出一个尝试链接


跟进ZooKeeper的构造方法


public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    watchManager.defaultWatcher = watcher;
    // todo 包装服务端的地址
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    //todo 将服务端的地址封装进 StaticHostProvider -> HostProvider中
    HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
    // todo 创建客户端的上下文, 这个上下文对象的亮点就是它维护了一个客户端的socket
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            // todo 跟进这个方法,getClientCnxnSocket, 获取出客户端上下文中的socket
            getClientCnxnSocket(), canBeReadOnly);
    // todo 启动客户端
    cnxn.start();
}


主要做了这么几件事

  • 将服务端的地址解析封装进了StaticHostProvider类中, 可以把这个类理解成专门存放服务端地址的set 集合
  • 创建出了客户端的上下文对象: ClientCnxn, 当然在这之前,入参位置还有一个getClientCnxnSocket()这个函数可以创建出客户端的NIO Socket
  • 然后调用cnxn.start() 其实就是启动了客户端的另外两条线程sendThreadeventThread 下面会详细说


创建客户端的 NioSocket#



继续跟进源码getClientCnxnSocket()通过反射,zk客户端使用的socket对象是ClientCnxnSocketNIO


//todo 通过反射创建出客户端上下文中的 socket , 实际的ClientCnxnSocketNIO 是 ClientCnxnSocket的子类
    // todo --->  zookeeper 封装的 NIO的逻辑都在   实际的ClientCnxnSocketNIO
    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
        // todo zookeeper.clientCnxnSocket
        String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
        if (clientCnxnSocketName == null) {
            // todo 上面String其实就是这个类的name, 根进去看一下它的属性
            // todo 这个类维护了NioSocket使用到的 selector 选择器 , 已经发生的感兴趣的事件SelectionKey
            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
        }
        try {
            // todo 可以看到客户端使用的 NioSocket
            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()
                    .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + clientCnxnSocketName);
            ioe.initCause(e);
            throw ioe;
        }
    }


创建 ClientCnxn客户端的上下文#



创建上下文,构造函数中的诸多属性都是在前面读取配置文件或是新添加进来的,重点是最后两行,它创建了两条线程类,和zk客户端的IO息息相关


public   ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId; // todo 刚才传递过来的值为0
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
        connectTimeout = sessionTimeout / hostProvider.size();
        // todo 添加read的超时时间
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        // todo  创建了一个seadThread 线程
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
    }


创建SendThread#



sendThred是一个客户端的线程类,什么时候开启? 其实就在上面,当创建了ClientCnxn后,调用的cnxn.start()就是在开启它的run() , 它有什么作用? 它的run()是一个无限循环,除非运到了close的条件,否则他就会一直循环下去, 比如向服务端发送心跳,或者向服务端发送我们在控制台输入的数据以及接受服务端发送过来的响应


这是他的构造方法,可以看到它还是一个守护线程,并拥有客户端socket的引用,有了NIO Socket相关技能


//todo
SendThread(ClientCnxnSocket clientCnxnSocket) {
    super(makeThreadName("-SendThread()"));
    // todo 设置状态 Connecting
    state = States.CONNECTING;
    // todo 就是在 Zookeeper new ClientCnxn 时, 在倒数第二个位置使传递进去一个函数实际的
    this.clientCnxnSocket = clientCnxnSocket;
    // todo 设置成守护线程
    setDaemon(true);
}


它的Run方法, 真的是好长啊, 比我上面写的部分内容还长(大概两百行了), 大概它的流程 ,每次循环:

  • 检查一下客户端的socket有没有和服务端的socket建立连接
  • 没有建立连接
  • 尝试选出其他的server地址进行连接
  • 如果满足close的条件,直接break 跳出整个while循环
  • 如果已经建立了连接
  • 计算 to = 读取的超时时间 - 服务端的响应时间
  • 未连接的状态
  • 计算 to = 连接超时时间 - 服务端的响应时间
  • 上面的两个to, 如果小于0, 说明客户端和服务端通信出现了异常, 很可能是server的session time out,于是抛出异常
  • 如果连接状态是健康的,向服务端发送心跳
  • clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);向服务端发送数据


在这个负责和服务端进行IO操作的线程中,只要不是close或其他重大错误,一般可以预知的异常都有try起来,然后记录日志,并没有其他操作,循环还是会进行


// todo introduce 介绍
    clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    // todo 这个while循环中存在建立连接的过程, 已经连接建立失败后不断重试的过程
    //todo  state.isAlive() 默认是 NOT_CONNECTED
    while (state.isAlive()) {
        try {
//todo 1111  如果socket还没有连接 /////////////////////////////////////////////////////////////////////////////////////////////////////////
            //todo  如果socket还没有连接
            if (!clientCnxnSocket.isConnected()) {
                // todo 判断是不是第一次连接, 如果不是第一次进入下面try代码块, 随机产生一个小于一秒的时间
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                // todo 如果是closing 或者 已经关闭了, 直接退出这个循环
                if (closing || !state.isAlive()) {
                    break;
                }
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    // todo 连接失败时,来这里重试连接
                    // todo 从我们传递进来的host地址中选择一个地址
                    serverAddress = hostProvider.next(1000);
                }
                // todo client和server进行socket连接
                // todo  跟进去 ,实现逻辑在上面
                // todo  这个方法开始建立连接,并将 isFasterConnect改成了 false
                startConnect(serverAddress);
                clientCnxnSocket.updateLastSendAndHeard();
            }
 //todo  2222 如果socket处于连接状态 /////////////////////////////////////////////////////////////////////////////////////////////////////////
            // todo 下面的连接状态
            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                           LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }
                    if (sendAuthEvent == true) {
                        eventThread.queueEvent(new WatchedEvent(
                              Watcher.Event.EventType.None,
                              authState,null));
                    }
                }
                // todo  连接成功的话执行to 为下面值
                // todo  to = 读取的超时时间 -  上一次的读取时间
                // todo 如果预订的超时时间 - 上次读的时间 <= 0 说明超时了
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                // todo 如果没有连接成功, 就会来到这里, 给 to 赋值
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }
 //todo  3333 异常处理 /////////////////////////////////////////////////////////////////////////////////////////////////////////
            // todo 下面抛出来了异常
            if (to <= 0) {
                String warnInfo;
                warnInfo = "Client session timed out, have not heard from server in "
                    + clientCnxnSocket.getIdleRecv()
                    + "ms"
                    + " for sessionid 0x"
                    + Long.toHexString(sessionId);
                LOG.warn(warnInfo);
                // todo 这里抛出来了异常, 下面的try 就会把它抓住
                throw new SessionTimeoutException(warnInfo);
            }
//todo  44444 连接成功执行的逻辑 /////////////////////////////////////////////////////////////////////////////////////////////////////////
            // todo 下面的是连接成功执行的逻辑
            if (state.isConnected()) {
                // todo  为了防止竞争状态丢失发送第二个ping, 同时也避免出现很多的ping
              //1000(1 second) is to prevent(阻止) race condition missing to send the second ping
              //also make sure not to send too many pings when readTimeout is small 
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                    ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    // todo 客户端一直在这里循环, 如果连接成功的话, 每次循环都来到这个逻辑这里发送 ping
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }
//todo 55555 /////////////////////////////////////////////////////////////////////////////////////////////////////////
            // If we are in read-only mode, seek for read/write server
            // todo 只读状态 相关逻辑
            if (state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout =
                        Math.min(2*pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }
//todo  66666 /////////////////////////////////////////////////////////////////////////////////////////////////////////
            // todo 消费outgoingqueue, 完成向服务端的发送发送
            // todo doTransport 是 ClientCnxnSocket 的抽象方法, 实现类clientCnxnSocketNio
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            // todo 在这个try中处理里面的抛出来的异常
            if (closing) {
                // todo 如果是请求关闭, 直接退出 break 出while循环
                if (LOG.isDebugEnabled()) {
                    // closing so this is expected
                    LOG.debug("An exception was thrown while closing send thread for session 0x"
                            + Long.toHexString(getSessionId())
                            + " : " + e.getMessage());
                }
                break;
            } else {
                // todo 只要不是退出异常, 下面的异常都是仅仅打印了一下出现了什么异常
                // this is ugly, you have a better way speak up
                if (e instanceof SessionExpiredException) {
                    LOG.info(e.getMessage() + ", closing socket connection");
                } else if (e instanceof SessionTimeoutException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof EndOfStreamException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof RWServerFoundException) {
                    LOG.info(e.getMessage());
                } else if (e instanceof SocketException) {
                    LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
                } else {
                    LOG.warn("Session 0x{} for server {}, unexpected error{}",
                                    Long.toHexString(getSessionId()),
                                    serverAddress,
                                    RETRY_CONN_MSG,
                                    e);
                }
                // todo 这个方法中, isFirstConnect = true
                cleanup();
                if (state.isAlive()) {
                    eventThread.queueEvent(new WatchedEvent(
                            Event.EventType.None,
                            Event.KeeperState.Disconnected,
                            null));
                }
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }
        }
    } // todo while循环的结束符号 , 这是个while循环, 除了上面的close其他异常都会继续循环, 接着上去再看一遍
    cleanup();
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                Event.KeeperState.Disconnected, null));
    }
    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
            "SendThread exited loop for session: 0x"
                   + Long.toHexString(getSessionId()));
}


在上面这个200行的Run方法中比较值得注意的几个方法如下

  • 如果做到下次选出一个非当前server的地址

针对下标运行,对数组的size取模, 再赋值给自己,所以就实现了从0 - array.size()的循环


public InetSocketAddress next(long spinDelay) {
        currentIndex = ++currentIndex % serverAddresses.size();
        if (currentIndex == lastIndex && spinDelay > 0) {
            try {
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        } else if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }


  • 如果检查到了没有连接的话,就是用clientCnxnSocket进行连接

这个函数中,将标记是否是第一次连接的标记置为了flase, 并且拿到了sessionid


// todo 保证连接的逻辑
        void primeConnection() throws IOException {
            LOG.info("Socket connection established to "
                     + clientCnxnSocket.getRemoteSocketAddress()
                     + ", initiating session");
            isFirstConnect = false;
            //todo  创建了一个建立连接的request, 并且在下面将它添加进来 outgoingqueue
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            synchronized (outgoingQueue) {
            ... 如watcher 相关的逻辑


SendThread 和 服务端的IO沟通#


跟进上面Run方法的如下方法,doTranprot


clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);


他是本类的抽象方法,具体的实现类是clientCnxnSocketNIO

跟进这个方法,其中有一步跟重要doIO(pendingQueue, outgoingQueue, cnxn);


for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // todo 建立连接的逻辑
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                // todo 往服务端发送数据的逻辑 , 方法在上面的64行
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }


  • DoIo的源码如下

它分成了两大模块

  • 读就绪, 读取服务端发送过来的数据
  • 写就绪, 往客户端发送用户在控制台输入的命令


void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        // todo 通过key获取服务端的channel
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        // TODO 读就绪
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                // todo 返回buffer
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) { //todo 连接有没有初始化, 来这之前被改成了 flase ,现在
                    // todo 读取服务端发给我的连接请求的结果
                    readConnectResult(); // primeConnection()
                    enableRead();
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    //todo 如果已经初始化了, 就来这里读取响应, 跟进去
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        //todo 写就绪
        if (sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                // todo 查询出可发送的packet
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());
                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
                        p.createBB();
                    }
                    // todo 往服务端发送数据 packet.ByteBuf
                    sock.write(p.bb); // 发送服务端
                    if (!p.bb.hasRemaining()) { //todo !hasRemaining  没有剩余的数据
                        sentCount++;
                        // todo 将发送过的packet从outgoingqueue移除
                        outgoingQueue.removeFirstOccurrence(p);
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                // todo 如果刚才的请求头的类型不是null , 不是ping ,不是权限验证 就把packet添加到 pendingQueue
                                /**
                                 * These are the packets that have been sent and are waiting for a response.
                                 * todo 这个penddingQueue 存放的是已经发送的 和 等待服务器响应的packet
                                 */
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
                if (outgoingQueue.isEmpty()) {
                    disableWrite();
                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                  e.html
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }
    }


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
11天前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
25 11
|
18天前
|
存储
ZooKeeper客户端常用命令
ZooKeeper客户端常用命令
25 1
|
2月前
|
算法 Java Linux
zookeeper单机伪集群集群部署
zookeeper单机伪集群集群部署
86 0
|
4月前
|
安全 Java API
Zookeeper(持续更新) VIP-02 Zookeeper客户端使用与集群特性
2,/usr/local/data/zookeeper-3,/usr/local/data/zookeeper-4,在每个目录中创建文件。创建四个文件夹/usr/local/data/zookeeper-1,/usr/local/data/zookeeper-Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点。己对外提供服务的起始状态。E: 角色, 默认是 participant,即参与过半机制的角色,选举,事务请求过半提交,还有一个是。
|
4月前
Zookeeper的客户端的命令
Zookeeper的客户端的命令
18 0
|
4月前
|
缓存 Java API
Zookeeper(持续更新) VIP-02 Zookeeper客户端使用与集群特性
Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。
|
4月前
|
Apache
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
46 2
|
5月前
|
存储 设计模式 算法
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。
137 1
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
|
19天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
32 2
|
4月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0