深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(下)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析

深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(上):https://developer.aliyun.com/article/1413768


如何实现?(leader)


如果选举出来的leader的zxid是在整个集群中最大的,也就意味着当前节点数据是最新的,那么zxid是和leader选举是有关系的。


每一轮新的leader选举,都会有一个epoch,是递加的。所以zxid由两部分组成(epoch + zxid)


通过这两个方式,能保障数据最新的是他的设计。


zxid(事务id)


代表着当前我们的每一个事务的操作都会生成一个唯一的id,是全局唯一的,所以为了保证事务的顺序一致性,使用一个递增的事务id来进行标记。前面说过所有的提案都会增加zxid。


zxid是64位的数字,其高32位是epoch(每一轮leader变化所带来的时钟周期的变化),低32位代表递增事务编号


源码分析猜想启动zookeeper


  • 加载配置(zoo.cfg)
  • 初始化
  • 启动2181的端口(独立的业务服务),监听客户端请求(zkClient)
  • 启动(2888、 3888)这个端口的监听
  • 初始化leader选举(----)
  • 开启leader选举
  • 加载磁盘的数据
// 下面代码是zookeeper2.7.0的加载分析流程
QuorumPeerMain main = new QuorumPeerMain();
try {
     main.initializeAndRun(args);
--------------------------------------------------------------------
protected void initializeAndRun(String[] args) throws ConfigException, IOException {
    // 用来保存全局配置的
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }
    // 启动定时任务去做日志清理
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
    // 如果是分布式的配置就启动这个
        if (args.length == 1 && config.servers.size() > 0) {
            this.runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running  in standalone mode");
            ZooKeeperServerMain.main(args); // 否则启动单机的
        }
    }
--------------------------------------------------------------------
public void runFromConfig(QuorumPeerConfig config) throws IOException {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException var4) {
            LOG.warn("Unable to register log4j JMX control", var4);
        }
        LOG.info("Starting quorum peer");
        try {
            /*
            ClientCnxn 客户端和服务端进行网络交互的类
            ServerCnxn 服务端的网络通信处理类
            */
            ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
            // quorumPeer 表示集群中某个节点的信息,它会将启动时加载的一些东西配置在这里面
            this.quorumPeer = new QuorumPeer();
            this.quorumPeer.setClientPortAddress(config.getClientPortAddress());
            this.quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir())));
            this.quorumPeer.setQuorumPeers(config.getServers());
            this.quorumPeer.setElectionType(config.getElectionAlg());// 选举算法
            this.quorumPeer.setMyid(config.getServerId());
            this.quorumPeer.setTickTime(config.getTickTime()); // 心跳时间间隔
            this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化的时长
            this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步的时长
            this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
            this.quorumPeer.setCnxnFactory(cnxnFactory);
            this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化
            this.quorumPeer.setLearnerType(config.getPeerType());
            this.quorumPeer.setSyncEnabled(config.getSyncEnabled());
            this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            this.quorumPeer.start(); // 启动任务
            this.quorumPeer.join();// quorumPeer是一个线程,代表要等到quorumPeer这个线程执行结束
        } catch (InterruptedException var3) {
            LOG.warn("Quorum Peer interrupted", var3);
        }
    }


Leader选举源码及原理分析


构建源码


  • 基于Maven构建


启动Zookeeper Server


  • 集群的方式 : QuorumPeerMain
  • 单机的方式运行 : ZookeeperServerMain


猜想


  • zoo.cfg , 加载配置
  • 监听2181 , (NIO / Netty) ->NIO (TCP协议的课程中有讲到)
  • 初始化一些2888、3888, 选举/数据同步的监听, BIO的方式
  • 选举算法的初始化以及选举的执行(leader)
  • 本地文件的加载和恢复
  • 数据同步.


源码分析


基于zookeeper3.6.1

 public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
---------------------------------------------------------------------
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 用来保存全局配置
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]); // 相当于zoo.cfg文件,解析配置文件并保存到QuorumPeerConfig
        }
    // 启动一个定时任务来清理日志
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
        if (args.length == 1 && config.isDistributed()) { // 集群
            this.runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
            ZooKeeperServerMain.main(args); // 单独节点
        }
    }
---------------------------------------------------------------------
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException var17) {
            LOG.warn("Unable to register log4j JMX control", var17);
        }
        LOG.info("Starting quorum peer");
        MetricsProvider metricsProvider; // 指标数据
    // 会发布当前zookeeper当中的指标数据
        try {
            metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration());
        } catch (MetricsProviderLifeCycleException var16) {
            throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), var16);
        }
        try {
            ServerMetrics.metricsProviderInitialized(metricsProvider);
            /*
            这个和2181端口监听有关系
            ClientCnxn 客户端和服务端进行网络交互的类
            ServerCnxn 服务端的网络通信处理类
            */
            ServerCnxnFactory cnxnFactory = null;
            ServerCnxnFactory secureCnxnFactory = null;
             // 安全连接的方式
            if (config.getClientPortAddress() != null) { // 默认情况下
                cnxnFactory = ServerCnxnFactory.createFactory();
                // 监听对应的端口
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }
            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }
            this.quorumPeer = this.getQuorumPeer();
            this.quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
            this.quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            this.quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
            this.quorumPeer.setElectionType(config.getElectionAlg()); // 采用什么样的选举算法
            this.quorumPeer.setMyid(config.getServerId());// myid
            this.quorumPeer.setTickTime(config.getTickTime());// 心跳时间间隔(2000)
            this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化时长
            this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步时长
            this.quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
            this.quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
            this.quorumPeer.setConfigFileName(config.getConfigFilename());
            this.quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
            this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化
            this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier() != null) {
                this.quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            this.quorumPeer.initConfigInZKDatabase();
            this.quorumPeer.setCnxnFactory(cnxnFactory);
            this.quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            this.quorumPeer.setSslQuorum(config.isSslQuorum());
            this.quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            this.quorumPeer.setLearnerType(config.getPeerType());
            this.quorumPeer.setSyncEnabled(config.getSyncEnabled());
            this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            if (config.sslQuorumReloadCertFiles) {
                this.quorumPeer.getX509Util().enableCertFileReloading();
            }
            this.quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
            this.quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
            this.quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
            this.quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
            if (this.quorumPeer.isQuorumSaslAuthEnabled()) {
                this.quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
                this.quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
                this.quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
                this.quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
                this.quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
            }
            this.quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
            this.quorumPeer.initialize();
            if (config.jvmPauseMonitorToRun) {
                this.quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
            }
            this.quorumPeer.start(); // 启动任务
            ZKAuditProvider.addZKStartStopAuditLog();
            this.quorumPeer.join(); // quorumPeer是线程,代表要等到quorumPeer 这个线程执行结束
        } catch (InterruptedException var15) {
            LOG.warn("Quorum Peer interrupted", var15);
        } finally {
            if (metricsProvider != null) {
                try {
                    metricsProvider.stop();
                } catch (Throwable var14) {
                    LOG.warn("Error while stopping metrics", var14);
                }
            }
        }
    }

连接zookeeper监控数据

连接上后可以看到其属性的指标

我们创建zookeeper节点的时候,会在zk的内存中jkdbbase,会有一个DataTree的对象,这里面主要就保存了zk里面节点的数据

// 进入 start 代码块
public synchronized void start() { // 重写线程的 start方法
        if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面
            throw new RuntimeException("My id " + this.myid + " not in the peer list");
        } else {
            this.loadDataBase(); // 加载数据
            this.startServerCnxnFactory();// 这里来启动2181的服务监听
            try {
                this.adminServer.start();
            } catch (AdminServerException var2) {
                LOG.warn("Problem starting AdminServer", var2);
                System.out.println(var2);
            }
            this.startLeaderElection(); // 开启leader选举
            this.startJvmPauseMonitor(); // 监控方面的东西
            super.start(); // 启动线程
        }
    }


startServerCnxnFactory 开启监听


---------------------------------------------------------------
// 初始化
if (config.getClientPortAddress() != null) {
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }
            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }
public static ServerCnxnFactory createFactory() throws IOException {
    // 获取一个系统的环境变量
        String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory");
        if (serverCnxnFactoryName == null) {
            // 使用NIO
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            // 否则会根据名字去加载对应的实现
            ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
            LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
            return serverCnxnFactory;
        } catch (Exception var3) {
            IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, var3);
            throw ioe;
        }
    }
---------------------------------------------------------------
// 针对 configure 不同的Factory中有不同的实现
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
        if (secure) {
            throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
        } else {
            // 安全逻辑方式登录
            this.configureSaslLogin();
            this.maxClientCnxns = maxcc;
            this.initMaxCnxns();
            this.sessionlessCnxnTimeout = Integer.getInteger("zookeeper.nio.sessionlessCnxnTimeout", 10000);
            this.cnxnExpiryQueue = new ExpiryQueue(this.sessionlessCnxnTimeout);
            this.expirerThread = new NIOServerCnxnFactory.ConnectionExpirerThread();
            int numCores = Runtime.getRuntime().availableProcessors(); // 获取当前系统处理器的核心数
            this.numSelectorThreads = Integer.getInteger("zookeeper.nio.numSelectorThreads", Math.max((int)Math.sqrt((double)((float)numCores / 2.0F)), 1));
            if (this.numSelectorThreads < 1) {
                throw new IOException("numSelectorThreads must be at least 1");
            } else {
                this.numWorkerThreads = Integer.getInteger("zookeeper.nio.numWorkerThreads", 2 * numCores);
                this.workerShutdownTimeoutMS = Long.getLong("zookeeper.nio.shutdownTimeout", 5000L);
                String logMsg = "Configuring NIO connection handler with " + this.sessionlessCnxnTimeout / 1000 + "s sessionless connection timeout, " + this.numSelectorThreads + " selector thread(s), " + (this.numWorkerThreads > 0 ? this.numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : "" + directBufferBytes / 1024 + " kB direct buffers.");
                LOG.info(logMsg);
                // 通过前面的计算,
                for(int i = 0; i < this.numSelectorThreads; ++i) {
                    this.selectorThreads.add(new NIOServerCnxnFactory.SelectorThread(i));
                }
                this.listenBacklog = backlog;
                // 打开一个ServerSocketChannel实例
                this.ss = ServerSocketChannel.open();
                this.ss.socket().setReuseAddress(true);
                LOG.info("binding to port {}", addr);
                if (this.listenBacklog == -1) {
                    this.ss.socket().bind(addr); // 绑定监听的端口号
                } else {
                    this.ss.socket().bind(addr, this.listenBacklog);
                }
                this.ss.configureBlocking(false);// 配置为非阻塞
                this.acceptThread = new NIOServerCnxnFactory.AcceptThread(this.ss, addr, this.selectorThreads);// 接收线程
                /**
        * * AcceptThread 用于处理接收客户端的请求
                * * selectorThreads 用来处理selector的读写请求
                */
            }
        }
    }
-----------------------------------------------------------------------
private void startServerCnxnFactory() {
        if (this.cnxnFactory != null) {
            this.cnxnFactory.start(); //在之前的配置中进行了赋值
        }
        if (this.secureCnxnFactory != null) {
            this.secureCnxnFactory.start(); //在之前的配置中进行了赋值
        }
    }
public void start() {
        this.stopped = false;
        if (this.workerPool == null) {
            this.workerPool = new WorkerService("NIOWorker", this.numWorkerThreads, false);
        }
        Iterator var1 = this.selectorThreads.iterator();
        while(var1.hasNext()) {
            NIOServerCnxnFactory.SelectorThread thread = (NIOServerCnxnFactory.SelectorThread)var1.next();
            if (thread.getState() == State.NEW) {
                thread.start(); // 而这里面的start 会调用里面的run方法
                // 开启selector轮询io操作的线程,如果当前没有就绪的连接,那么就会阻塞到select()里,因为里面的run方法里面的select 里面的 selector.select() 进行复路器的选择的时候,如果没有就绪连接,那么它会阻塞。
            }
        }
        if (this.acceptThread.getState() == State.NEW) {
            this.acceptThread.start();// 用于处理客户端的连接
        }
    // 在这里可以借鉴的是,其将io处理 和 连接处理的两个操作分开,到两个线程去处理,而且其采用多个线程进行一个轮询,整体提升其处理的性能。
        if (this.expirerThread.getState() == State.NEW) {
            this.expirerThread.start();
        }
    }
public void run() {
            try {
                while(!NIOServerCnxnFactory.this.stopped && !this.acceptSocket.socket().isClosed()) {
                    try {
                        this.select();
                    } catch (RuntimeException var6) {
                        NIOServerCnxnFactory.LOG.warn("Ignoring unexpected runtime exception", var6);
                    } catch (Exception var7) {
                        NIOServerCnxnFactory.LOG.warn("Ignoring unexpected exception", var7);
                    }
                }
            } finally {
                this.closeSelector();
                if (!this.reconfiguring) {
                    NIOServerCnxnFactory.this.stop();
                }
                NIOServerCnxnFactory.LOG.info("accept thread exitted run method");
            }
        }
private void select() {
            try {
                this.selector.select(); // 客户端进行轮询,去轮询就绪的连接
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while(!NIOServerCnxnFactory.this.stopped && selectedKeys.hasNext()) {
                    SelectionKey key = (SelectionKey)selectedKeys.next();
                    selectedKeys.remove();
                    if (key.isValid()) {
                        if (key.isAcceptable()) {
                            if (!this.doAccept()) {
                                this.pauseAccept(10L);
                            }
                        } else {
                            NIOServerCnxnFactory.LOG.warn("Unexpected ops in accept select {}", key.readyOps());
                        }
                    }
                }
            } catch (IOException var3) {
                NIOServerCnxnFactory.LOG.warn("Ignoring IOException while selecting", var3);
            }
        }


startLeaderElection 开启leader选举


public synchronized void startLeaderElection() {
        try {
            // 得到当前节点的状态,如果是LOOKING
            if (this.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                // 构建一个Vote(myid, zxid, epoch)
                this.currentVote = new Vote(this.myid, this.getLastLoggedZxid(), this.getCurrentEpoch());
            }
        } catch (IOException var3) {
            RuntimeException re = new RuntimeException(var3.getMessage());
            re.setStackTrace(var3.getStackTrace());
            throw re;
        }
    // 根据 electionType 来创建选举算法
        this.electionAlg = this.createElectionAlgorithm(this.electionType);
    }
------------------------------------------------------------------------------
// 在3.6版本中,无论怎么配置其他的选举算法,最后都支持一种,因为在源码中可以看到,前两种直接抛出了异常。
protected Election createElectionAlgorithm(int electionAlgorithm) {
        Election le = null;
        switch(electionAlgorithm) {
        case 1:
            throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
        case 2:
            throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
        case 3:
            // cnxn (和网络通信有关的一个类,ServerCnxn,ClientCnxn)
            // QuorumCnxManager 管理集群选举和投票相关的
            QuorumCnxManager qcm = this.createCnxnManager();
            QuorumCnxManager oldQcm = (QuorumCnxManager)this.qcmRef.getAndSet(qcm);
            if (oldQcm != null) { // 判断是否已经开启了选举
                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                oldQcm.halt();// 终止掉当前的选举
            }
      // 监听集群中的票据
            Listener listener = qcm.listener;
            if (listener != null) {
                listener.start();
                // 初始化了 FastLeaderElection
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();// 启动leader选举
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }
---------------------------------------------------------------------------
// 进入start方法中
public void start() {
        this.messenger.start();
    }
void start() {
    // 启动两个线程
    /*
    从这里可以看到zookeeper的整个流程基本上都是异步化的
    WorkerSender  发送票据
    WorkerReceiver  接收票据
    */
            this.wsThread.start();
            this.wrThread.start();
        }
protected class Messenger {
        FastLeaderElection.Messenger.WorkerSender ws;
        FastLeaderElection.Messenger.WorkerReceiver wr;
        Thread wsThread = null;
        Thread wrThread = null;
        Messenger(QuorumCnxManager manager) {
            // 启动的线程的初始化
            this.ws = new FastLeaderElection.Messenger.WorkerSender(manager);
            this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wsThread.setDaemon(true);
            this.wr = new FastLeaderElection.Messenger.WorkerReceiver(manager);
            this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wrThread.setDaemon(true);
        }

前面部分所有的逻辑如图所示


最终执行


前面的实际上通过阅读都是一些初始化逻辑,而真正的执行是 其中的 super.start()

// 进入 start 代码块
public synchronized void start() { // 重写线程的 start方法
        if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面
            throw new RuntimeException("My id " + this.myid + " not in the peer list");
        } else {
            this.loadDataBase(); // 加载数据
            this.startServerCnxnFactory();// 这里来启动2181的服务监听
            try {
                this.adminServer.start();
            } catch (AdminServerException var2) {
                LOG.warn("Problem starting AdminServer", var2);
                System.out.println(var2);
            }
            this.startLeaderElection(); // 开启leader选举
            this.startJvmPauseMonitor(); // 监控方面的东西
            super.start(); // 启动线程
        }
    }
--------------------------------------------------------------------------
public void run() {
        this.updateThreadName();
        LOG.debug("Starting quorum peer");
    // 整个这部分都是注册JMS 也就是上报指标的数据
        try {
            this.jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(this.jmxQuorumBean, (ZKMBeanInfo)null);
            Iterator var1 = this.getView().values().iterator();
            while(var1.hasNext()) {
                QuorumPeer.QuorumServer s = (QuorumPeer.QuorumServer)var1.next();
                if (this.getId() == s.id) {
                    LocalPeerBean p = this.jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, this.jmxQuorumBean);
                    } catch (Exception var88) {
                        LOG.warn("Failed to register with JMX", var88);
                        this.jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, this.jmxQuorumBean);
                        this.jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception var87) {
                        LOG.warn("Failed to register with JMX", var87);
                    }
                }
            }
        } catch (Exception var92) {
            LOG.warn("Failed to register with JMX", var92);
            this.jmxQuorumBean = null;
        }
        while(true) {
            boolean var27 = false;
            // 只要程序一直在运行,那么就是死循环
            try {
                var27 = true;
                if (!this.running) {
                    var27 = false;
                    break;
                }
                switch(this.getPeerState()) { // 得到当前节点的状态(leader follower looking observer)
                case LOOKING:
                    LOG.info("LOOKING");
                    ServerMetrics.getMetrics().LOOKING_COUNT.add(1L);
                    // 如果是只读模式
                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(this.logFactory, this, this.zkDb);
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    sleep((long)Math.max(2000, QuorumPeer.this.tickTime));
                                    if (QuorumPeer.ServerState.LOOKING.equals(QuorumPeer.this.getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException var2) {
                                    QuorumPeer.LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception var3) {
                                    QuorumPeer.LOG.error("FAILED to start ReadOnlyZooKeeperServer", var3);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            this.reconfigFlagClear();
                            if (this.shuttingDownLE) {
                                this.shuttingDownLE = false;
                                this.startLeaderElection();
                            }
                            this.setCurrentVote(this.makeLEStrategy().lookForLeader());
                        } catch (Exception var85) {
                            LOG.warn("Unexpected exception", var85);
                            this.setPeerState(QuorumPeer.ServerState.LOOKING);
                        } finally {
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                            this.reconfigFlagClear();
                            if (this.shuttingDownLE) {
                                this.shuttingDownLE = false;
                                this.startLeaderElection();
                            }
              // makeLEStrategy().lookForLeader() 得到一个vote,说明这个vote是满足被同意的vote,得到vote是一个leader的vote 当前节点一定会在选举算法中,得到leader之后,重新设置一个状态
                            this.setCurrentVote(this.makeLEStrategy().lookForLeader());
                        } catch (Exception var84) {
                            LOG.warn("Unexpected exception", var84);
                            this.setPeerState(QuorumPeer.ServerState.LOOKING);
                        }
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        try {
                            // 说明自己就是leader,那么把自己设置成leader
                            this.setLeader(this.makeLeader(this.logFactory));
                            this.leader.lead();
                            this.setLeader((Leader)null);
                        } catch (Exception var80) {
                            LOG.warn("Unexpected exception", var80);
                        }
                        break;
                    } finally {
                        if (this.leader != null) {
                            this.leader.shutdown("Forcing shutdown");
                            this.setLeader((Leader)null);
                        }
                        this.updateServerState();
                    }
                case FOLLOWING:
                    try {
                        try {
                            LOG.info("FOLLOWING");
                            this.setFollower(this.makeFollower(this.logFactory));
                            this.follower.followLeader();// 如果当前节点是follower,则连接到leader
                        } catch (Exception var81) {
                            LOG.warn("Unexpected exception", var81);
                        }
                        break;
                    } finally {
                        this.follower.shutdown();
                        this.setFollower((Follower)null);
                        this.updateServerState();
                    }
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        this.setObserver(this.makeObserver(this.logFactory));
                        this.observer.observeLeader();
                    } catch (Exception var83) {
                        LOG.warn("Unexpected exception", var83);
                    } finally {
                        this.observer.shutdown();
                        this.setObserver((Observer)null);
                        this.updateServerState();
                        if (this.isRunning()) {
                            Observer.waitForObserverElectionDelay();
                        }
                    }
                }
            } finally {
                if (var27) {
                    LOG.warn("QuorumPeer main thread exited");
                    MBeanRegistry instance = MBeanRegistry.getInstance();
                    instance.unregister(this.jmxQuorumBean);
                    instance.unregister(this.jmxLocalPeerBean);
                    Iterator var12 = this.jmxRemotePeerBean.values().iterator();
                    while(var12.hasNext()) {
                        RemotePeerBean remotePeerBean = (RemotePeerBean)var12.next();
                        instance.unregister(remotePeerBean);
                    }
                    this.jmxQuorumBean = null;
                    this.jmxLocalPeerBean = null;
                    this.jmxRemotePeerBean = null;
                }
            }
        }
        LOG.warn("QuorumPeer main thread exited");
        MBeanRegistry instance = MBeanRegistry.getInstance();
        instance.unregister(this.jmxQuorumBean);
        instance.unregister(this.jmxLocalPeerBean);
        Iterator var96 = this.jmxRemotePeerBean.values().iterator();
        while(var96.hasNext()) {
            RemotePeerBean remotePeerBean = (RemotePeerBean)var96.next();
            instance.unregister(remotePeerBean);
        }
        this.jmxQuorumBean = null;
        this.jmxLocalPeerBean = null;
        this.jmxRemotePeerBean = null;
    }


makeLEStrategy().lookForLeader() leader选举的算法


因为前面分析过源码,其实内部只有一种选举算法 FastLeaderElection

public Vote lookForLeader() throws InterruptedException {
    // 前面还是和JMX有关,监控数据
        try {
            this.self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean);
        } catch (Exception var24) {
            LOG.warn("Failed to register with JMX", var24);
            this.self.jmxLeaderElectionBean = null;
        }
        this.self.start_fle = Time.currentElapsedTime();
        try {
            Map<Long, Vote> recvset = new HashMap();
            Map<Long, Vote> outofelection = new HashMap();
            int notTimeout = minNotificationInterval;
            synchronized(this) {
                // 原地底层 logicalclock   代表逻辑时钟 epoch
                this.logicalclock.incrementAndGet();
                // 更新事务的票据 myid zxid epoch   把自己投票的提案设置成自己的信息
                this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
            }
            LOG.info("New election. My id = {}, proposed zxid=0x{}", this.self.getId(), Long.toHexString(this.proposedZxid));
            this.sendNotifications(); // 发送通知,将自己的票据发送到集群的其他节点,相当于每个节点都会收到其他节点的vote
            FastLeaderElection.Notification n;
            // 只要当前节点的状态是Looking,就不断地循环
            while(this.self.getPeerState() == ServerState.LOOKING && !this.stop) {
                // 从接收队列中获取一个 Notification 其表示接收到的集群中任意一个节点的vote
                n = (FastLeaderElection.Notification)this.recvqueue.poll((long)notTimeout, TimeUnit.MILLISECONDS);
                if (n == null) { // 判断是否有数据
                    if (this.manager.haveDelivered()) {
                        this.sendNotifications(); // 重新发送一次
                    } else {
                        this.manager.connectAll(); // 重连
                    }
          // 延迟重试
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                // 先验证收到的Notification是否合法
                // sid/leader 都代表myid
                } else if (this.validVoter(n.sid) && this.validVoter(n.leader)) {
                    SyncedLearnerTracker voteSet;
                    Vote endVote;
                    Vote var7;
                    switch(n.state) { // 收到通知的节点状态
                    case LOOKING: // 在leader选举的这个阶段,收到投票时,所有节点的状态都是looking
                        if (this.getInitLastLoggedZxid() == -1L) {
                            LOG.debug("Ignoring notification as our zxid is -1");  
                        } 
                        else if (n.zxid == -1L) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                        } else {
                            // 收到的epoch 要大于 自己的epoch
                            // 说明收到要比自己的更新,所以应该接纳对方的vote
                            if (n.electionEpoch > this.logicalclock.get()) {
                                this.logicalclock.set(n.electionEpoch); // 先将自己的epoch改成对方的
                                recvset.clear(); // 清空集合,重新归票
                                // 比较vote
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) {
                                    // 更新成收到的vote(leader zxid epoch)
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
                                }
                // 更新完成后继续发送广播 vote (可能是自己的,也可能是别人的)取决于epoch zxid myid
                                this.sendNotifications();
                            } else {
                                // 如果收到的epoch小于自己的,那么说明没资格影响自己的票据
                                if (n.electionEpoch < this.logicalclock.get()) {
                                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}", Long.toHexString(n.electionEpoch), Long.toHexString(this.logicalclock.get()));
                                    continue;
                                }
                // 说明epoch相等
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    this.sendNotifications();
                                }
                            }
                            LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}", new Object[]{n.sid, n.leader, Long.toHexString(n.zxid), Long.toHexString(n.electionEpoch)});
                            // 通过上面的多层比较,将其存在 recvset
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            // 根据选票数量来决定是否决定选举 
                            voteSet = this.getVoteTracker(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch));
--------------------------------------------------------------------------------  
    // 总体来说,该方法的目的是根据给定的投票信息和特定投票对象构建一个SyncedLearnerTracker对象,并将满足条件的确认节点添加到其中。 
    protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(this.self.getQuorumVerifier());
        if (this.self.getLastSeenQuorumVerifier() != null && this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
        }
        Iterator var4 = votes.entrySet().iterator();
        while(var4.hasNext()) {
            Entry<Long, Vote> entry = (Entry)var4.next();
            if (vote.equals(entry.getValue())) {
                voteSet.addAck((Long)entry.getKey());
            }
        }
        return voteSet;
    }                    
--------------------------------------------------------------------------------                                // 这里面的方法其实就是判断是否当前的myid 过了半数
                            if (!voteSet.hasAllQuorums()) {
                                continue;
                            }
-------------------------------------------------------------------------------- 
public boolean hasAllQuorums() {
        Iterator var1 = this.qvAcksetPairs.iterator();
        SyncedLearnerTracker.QuorumVerifierAcksetPair qvAckset;
        do {
            if (!var1.hasNext()) {
                return true;
            }
            qvAckset = (SyncedLearnerTracker.QuorumVerifierAcksetPair)var1.next();
        } while(qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()));
        return false;
    }           
public boolean containsQuorum(Set<Long> ackSet) {
        return ackSet.size() > this.half;
    }                      
-------------------------------------------------------------------------------- 
    // 验证是否会出现改变                        
    while((n = (FastLeaderElection.Notification)this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
                                    this.recvqueue.put(n);
                                    break;
                                }
                            }
                            if (n == null) {
                                // 投票结束,这是当前节点的状态
                                this.setPeerState(this.proposedLeader, voteSet);
                                // 更新最终选举的状态
                                endVote = new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch);
                                this.leaveInstance(endVote);
                                var7 = endVote;
                                return var7;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        if (n.electionEpoch == this.logicalclock.get()) {
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = this.getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            if (voteSet.hasAllQuorums() && this.checkLeader(recvset, n.leader, n.electionEpoch)) {
                                this.setPeerState(n.leader, voteSet);
                                endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                this.leaveInstance(endVote);
                                var7 = endVote;
                                return var7;
                            }
                        }
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = this.getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        if (voteSet.hasAllQuorums() && this.checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized(this) {
                                this.logicalclock.set(n.electionEpoch);
                                this.setPeerState(n.leader, voteSet);
                            }
                            endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            this.leaveInstance(endVote);
                            var7 = endVote;
                            return var7;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
                    }
                } else {
                    if (!this.validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!this.validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            n = null;
            return n;
        } finally {
            try {
                if (this.self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean);
                }
            } catch (Exception var21) {
                LOG.warn("Failed to unregister with JMX", var21);
            }
            this.self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", this.manager.getConnectionThreadCount());
        }
    }

整体流程


sendNotifications


private void sendNotifications() {
        Iterator var1 = this.self.getCurrentAndNextConfigVoters().iterator();
        while(var1.hasNext()) {
            long sid = (Long)var1.next();
            QuorumVerifier qv = this.self.getQuorumVerifier();
            FastLeaderElection.ToSend notmsg = new FastLeaderElection.ToSend(FastLeaderElection.ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock.get(), ServerState.LOOKING, sid, this.proposedEpoch, qv.toString().getBytes());
            LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient), {} (myid), 0x{} (n.peerEpoch) ", new Object[]{this.proposedLeader, Long.toHexString(this.proposedZxid), Long.toHexString(this.logicalclock.get()), sid, this.self.getId(), Long.toHexString(this.proposedEpoch)});
            this.sendqueue.offer(notmsg);
        }
    }

如果存在多个节点的话

先假设,三个节点,分别是 vote(1) vote(2) vote(3)


假设下边的收到 vote(1) 通知,发现 vote(2) 比 vote(1) 的epoch大,此时需要将 vote(1) -》 vote(2),下次发送出去也是 vote(2)

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
1月前
|
存储 负载均衡 算法
深入浅出Zookeeper源码(七):Leader选举
对于一个分布式集群来说,保证数据写入一致性最简单的方式就是依靠一个节点来调度和管理其他节点。在分布式系统中我们一般称其为Leader。
173 6
|
1月前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
77 0
|
1月前
|
消息中间件 分布式计算 算法
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(上)
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析
97 0
|
1月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
55 0
|
1月前
|
NoSQL 中间件 API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
104 2
|
1月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(上)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
93 0
|
1月前
|
存储 分布式计算 Dubbo
【中间件】zookeeper的实现原理
【中间件】zookeeper的实现原理
34 0
|
1月前
|
网络协议 中间件 数据库
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
156 0
|
1月前
|
存储 API
深入理解Zookeeper系列-4.Watcher原理
深入理解Zookeeper系列-4.Watcher原理
33 1
|
1月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
313 0