深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(下)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析

深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(上):https://developer.aliyun.com/article/1413768


如何实现?(leader)


如果选举出来的leader的zxid是在整个集群中最大的,也就意味着当前节点数据是最新的,那么zxid是和leader选举是有关系的。


每一轮新的leader选举,都会有一个epoch,是递加的。所以zxid由两部分组成(epoch + zxid)


通过这两个方式,能保障数据最新的是他的设计。


zxid(事务id)


代表着当前我们的每一个事务的操作都会生成一个唯一的id,是全局唯一的,所以为了保证事务的顺序一致性,使用一个递增的事务id来进行标记。前面说过所有的提案都会增加zxid。


zxid是64位的数字,其高32位是epoch(每一轮leader变化所带来的时钟周期的变化),低32位代表递增事务编号


源码分析猜想启动zookeeper


  • 加载配置(zoo.cfg)
  • 初始化
  • 启动2181的端口(独立的业务服务),监听客户端请求(zkClient)
  • 启动(2888、 3888)这个端口的监听
  • 初始化leader选举(----)
  • 开启leader选举
  • 加载磁盘的数据
// 下面代码是zookeeper2.7.0的加载分析流程
QuorumPeerMain main = new QuorumPeerMain();
try {
     main.initializeAndRun(args);
--------------------------------------------------------------------
protected void initializeAndRun(String[] args) throws ConfigException, IOException {
    // 用来保存全局配置的
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }
    // 启动定时任务去做日志清理
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
    // 如果是分布式的配置就启动这个
        if (args.length == 1 && config.servers.size() > 0) {
            this.runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running  in standalone mode");
            ZooKeeperServerMain.main(args); // 否则启动单机的
        }
    }
--------------------------------------------------------------------
public void runFromConfig(QuorumPeerConfig config) throws IOException {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException var4) {
            LOG.warn("Unable to register log4j JMX control", var4);
        }
        LOG.info("Starting quorum peer");
        try {
            /*
            ClientCnxn 客户端和服务端进行网络交互的类
            ServerCnxn 服务端的网络通信处理类
            */
            ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
            // quorumPeer 表示集群中某个节点的信息,它会将启动时加载的一些东西配置在这里面
            this.quorumPeer = new QuorumPeer();
            this.quorumPeer.setClientPortAddress(config.getClientPortAddress());
            this.quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir())));
            this.quorumPeer.setQuorumPeers(config.getServers());
            this.quorumPeer.setElectionType(config.getElectionAlg());// 选举算法
            this.quorumPeer.setMyid(config.getServerId());
            this.quorumPeer.setTickTime(config.getTickTime()); // 心跳时间间隔
            this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化的时长
            this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步的时长
            this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
            this.quorumPeer.setCnxnFactory(cnxnFactory);
            this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化
            this.quorumPeer.setLearnerType(config.getPeerType());
            this.quorumPeer.setSyncEnabled(config.getSyncEnabled());
            this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            this.quorumPeer.start(); // 启动任务
            this.quorumPeer.join();// quorumPeer是一个线程,代表要等到quorumPeer这个线程执行结束
        } catch (InterruptedException var3) {
            LOG.warn("Quorum Peer interrupted", var3);
        }
    }


Leader选举源码及原理分析


构建源码


  • 基于Maven构建


启动Zookeeper Server


  • 集群的方式 : QuorumPeerMain
  • 单机的方式运行 : ZookeeperServerMain


猜想


  • zoo.cfg , 加载配置
  • 监听2181 , (NIO / Netty) ->NIO (TCP协议的课程中有讲到)
  • 初始化一些2888、3888, 选举/数据同步的监听, BIO的方式
  • 选举算法的初始化以及选举的执行(leader)
  • 本地文件的加载和恢复
  • 数据同步.


源码分析


基于zookeeper3.6.1

 public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
---------------------------------------------------------------------
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 用来保存全局配置
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]); // 相当于zoo.cfg文件,解析配置文件并保存到QuorumPeerConfig
        }
    // 启动一个定时任务来清理日志
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
        if (args.length == 1 && config.isDistributed()) { // 集群
            this.runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
            ZooKeeperServerMain.main(args); // 单独节点
        }
    }
---------------------------------------------------------------------
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException var17) {
            LOG.warn("Unable to register log4j JMX control", var17);
        }
        LOG.info("Starting quorum peer");
        MetricsProvider metricsProvider; // 指标数据
    // 会发布当前zookeeper当中的指标数据
        try {
            metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration());
        } catch (MetricsProviderLifeCycleException var16) {
            throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), var16);
        }
        try {
            ServerMetrics.metricsProviderInitialized(metricsProvider);
            /*
            这个和2181端口监听有关系
            ClientCnxn 客户端和服务端进行网络交互的类
            ServerCnxn 服务端的网络通信处理类
            */
            ServerCnxnFactory cnxnFactory = null;
            ServerCnxnFactory secureCnxnFactory = null;
             // 安全连接的方式
            if (config.getClientPortAddress() != null) { // 默认情况下
                cnxnFactory = ServerCnxnFactory.createFactory();
                // 监听对应的端口
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }
            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }
            this.quorumPeer = this.getQuorumPeer();
            this.quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
            this.quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            this.quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
            this.quorumPeer.setElectionType(config.getElectionAlg()); // 采用什么样的选举算法
            this.quorumPeer.setMyid(config.getServerId());// myid
            this.quorumPeer.setTickTime(config.getTickTime());// 心跳时间间隔(2000)
            this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化时长
            this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步时长
            this.quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
            this.quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
            this.quorumPeer.setConfigFileName(config.getConfigFilename());
            this.quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
            this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化
            this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier() != null) {
                this.quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            this.quorumPeer.initConfigInZKDatabase();
            this.quorumPeer.setCnxnFactory(cnxnFactory);
            this.quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            this.quorumPeer.setSslQuorum(config.isSslQuorum());
            this.quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            this.quorumPeer.setLearnerType(config.getPeerType());
            this.quorumPeer.setSyncEnabled(config.getSyncEnabled());
            this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            if (config.sslQuorumReloadCertFiles) {
                this.quorumPeer.getX509Util().enableCertFileReloading();
            }
            this.quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
            this.quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
            this.quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
            this.quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
            if (this.quorumPeer.isQuorumSaslAuthEnabled()) {
                this.quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
                this.quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
                this.quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
                this.quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
                this.quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
            }
            this.quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
            this.quorumPeer.initialize();
            if (config.jvmPauseMonitorToRun) {
                this.quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
            }
            this.quorumPeer.start(); // 启动任务
            ZKAuditProvider.addZKStartStopAuditLog();
            this.quorumPeer.join(); // quorumPeer是线程,代表要等到quorumPeer 这个线程执行结束
        } catch (InterruptedException var15) {
            LOG.warn("Quorum Peer interrupted", var15);
        } finally {
            if (metricsProvider != null) {
                try {
                    metricsProvider.stop();
                } catch (Throwable var14) {
                    LOG.warn("Error while stopping metrics", var14);
                }
            }
        }
    }

连接zookeeper监控数据

连接上后可以看到其属性的指标

我们创建zookeeper节点的时候,会在zk的内存中jkdbbase,会有一个DataTree的对象,这里面主要就保存了zk里面节点的数据

// 进入 start 代码块
public synchronized void start() { // 重写线程的 start方法
        if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面
            throw new RuntimeException("My id " + this.myid + " not in the peer list");
        } else {
            this.loadDataBase(); // 加载数据
            this.startServerCnxnFactory();// 这里来启动2181的服务监听
            try {
                this.adminServer.start();
            } catch (AdminServerException var2) {
                LOG.warn("Problem starting AdminServer", var2);
                System.out.println(var2);
            }
            this.startLeaderElection(); // 开启leader选举
            this.startJvmPauseMonitor(); // 监控方面的东西
            super.start(); // 启动线程
        }
    }


startServerCnxnFactory 开启监听


---------------------------------------------------------------
// 初始化
if (config.getClientPortAddress() != null) {
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }
            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }
public static ServerCnxnFactory createFactory() throws IOException {
    // 获取一个系统的环境变量
        String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory");
        if (serverCnxnFactoryName == null) {
            // 使用NIO
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            // 否则会根据名字去加载对应的实现
            ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
            LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
            return serverCnxnFactory;
        } catch (Exception var3) {
            IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, var3);
            throw ioe;
        }
    }
---------------------------------------------------------------
// 针对 configure 不同的Factory中有不同的实现
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
        if (secure) {
            throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
        } else {
            // 安全逻辑方式登录
            this.configureSaslLogin();
            this.maxClientCnxns = maxcc;
            this.initMaxCnxns();
            this.sessionlessCnxnTimeout = Integer.getInteger("zookeeper.nio.sessionlessCnxnTimeout", 10000);
            this.cnxnExpiryQueue = new ExpiryQueue(this.sessionlessCnxnTimeout);
            this.expirerThread = new NIOServerCnxnFactory.ConnectionExpirerThread();
            int numCores = Runtime.getRuntime().availableProcessors(); // 获取当前系统处理器的核心数
            this.numSelectorThreads = Integer.getInteger("zookeeper.nio.numSelectorThreads", Math.max((int)Math.sqrt((double)((float)numCores / 2.0F)), 1));
            if (this.numSelectorThreads < 1) {
                throw new IOException("numSelectorThreads must be at least 1");
            } else {
                this.numWorkerThreads = Integer.getInteger("zookeeper.nio.numWorkerThreads", 2 * numCores);
                this.workerShutdownTimeoutMS = Long.getLong("zookeeper.nio.shutdownTimeout", 5000L);
                String logMsg = "Configuring NIO connection handler with " + this.sessionlessCnxnTimeout / 1000 + "s sessionless connection timeout, " + this.numSelectorThreads + " selector thread(s), " + (this.numWorkerThreads > 0 ? this.numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : "" + directBufferBytes / 1024 + " kB direct buffers.");
                LOG.info(logMsg);
                // 通过前面的计算,
                for(int i = 0; i < this.numSelectorThreads; ++i) {
                    this.selectorThreads.add(new NIOServerCnxnFactory.SelectorThread(i));
                }
                this.listenBacklog = backlog;
                // 打开一个ServerSocketChannel实例
                this.ss = ServerSocketChannel.open();
                this.ss.socket().setReuseAddress(true);
                LOG.info("binding to port {}", addr);
                if (this.listenBacklog == -1) {
                    this.ss.socket().bind(addr); // 绑定监听的端口号
                } else {
                    this.ss.socket().bind(addr, this.listenBacklog);
                }
                this.ss.configureBlocking(false);// 配置为非阻塞
                this.acceptThread = new NIOServerCnxnFactory.AcceptThread(this.ss, addr, this.selectorThreads);// 接收线程
                /**
        * * AcceptThread 用于处理接收客户端的请求
                * * selectorThreads 用来处理selector的读写请求
                */
            }
        }
    }
-----------------------------------------------------------------------
private void startServerCnxnFactory() {
        if (this.cnxnFactory != null) {
            this.cnxnFactory.start(); //在之前的配置中进行了赋值
        }
        if (this.secureCnxnFactory != null) {
            this.secureCnxnFactory.start(); //在之前的配置中进行了赋值
        }
    }
public void start() {
        this.stopped = false;
        if (this.workerPool == null) {
            this.workerPool = new WorkerService("NIOWorker", this.numWorkerThreads, false);
        }
        Iterator var1 = this.selectorThreads.iterator();
        while(var1.hasNext()) {
            NIOServerCnxnFactory.SelectorThread thread = (NIOServerCnxnFactory.SelectorThread)var1.next();
            if (thread.getState() == State.NEW) {
                thread.start(); // 而这里面的start 会调用里面的run方法
                // 开启selector轮询io操作的线程,如果当前没有就绪的连接,那么就会阻塞到select()里,因为里面的run方法里面的select 里面的 selector.select() 进行复路器的选择的时候,如果没有就绪连接,那么它会阻塞。
            }
        }
        if (this.acceptThread.getState() == State.NEW) {
            this.acceptThread.start();// 用于处理客户端的连接
        }
    // 在这里可以借鉴的是,其将io处理 和 连接处理的两个操作分开,到两个线程去处理,而且其采用多个线程进行一个轮询,整体提升其处理的性能。
        if (this.expirerThread.getState() == State.NEW) {
            this.expirerThread.start();
        }
    }
public void run() {
            try {
                while(!NIOServerCnxnFactory.this.stopped && !this.acceptSocket.socket().isClosed()) {
                    try {
                        this.select();
                    } catch (RuntimeException var6) {
                        NIOServerCnxnFactory.LOG.warn("Ignoring unexpected runtime exception", var6);
                    } catch (Exception var7) {
                        NIOServerCnxnFactory.LOG.warn("Ignoring unexpected exception", var7);
                    }
                }
            } finally {
                this.closeSelector();
                if (!this.reconfiguring) {
                    NIOServerCnxnFactory.this.stop();
                }
                NIOServerCnxnFactory.LOG.info("accept thread exitted run method");
            }
        }
private void select() {
            try {
                this.selector.select(); // 客户端进行轮询,去轮询就绪的连接
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while(!NIOServerCnxnFactory.this.stopped && selectedKeys.hasNext()) {
                    SelectionKey key = (SelectionKey)selectedKeys.next();
                    selectedKeys.remove();
                    if (key.isValid()) {
                        if (key.isAcceptable()) {
                            if (!this.doAccept()) {
                                this.pauseAccept(10L);
                            }
                        } else {
                            NIOServerCnxnFactory.LOG.warn("Unexpected ops in accept select {}", key.readyOps());
                        }
                    }
                }
            } catch (IOException var3) {
                NIOServerCnxnFactory.LOG.warn("Ignoring IOException while selecting", var3);
            }
        }


startLeaderElection 开启leader选举


public synchronized void startLeaderElection() {
        try {
            // 得到当前节点的状态,如果是LOOKING
            if (this.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                // 构建一个Vote(myid, zxid, epoch)
                this.currentVote = new Vote(this.myid, this.getLastLoggedZxid(), this.getCurrentEpoch());
            }
        } catch (IOException var3) {
            RuntimeException re = new RuntimeException(var3.getMessage());
            re.setStackTrace(var3.getStackTrace());
            throw re;
        }
    // 根据 electionType 来创建选举算法
        this.electionAlg = this.createElectionAlgorithm(this.electionType);
    }
------------------------------------------------------------------------------
// 在3.6版本中,无论怎么配置其他的选举算法,最后都支持一种,因为在源码中可以看到,前两种直接抛出了异常。
protected Election createElectionAlgorithm(int electionAlgorithm) {
        Election le = null;
        switch(electionAlgorithm) {
        case 1:
            throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
        case 2:
            throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
        case 3:
            // cnxn (和网络通信有关的一个类,ServerCnxn,ClientCnxn)
            // QuorumCnxManager 管理集群选举和投票相关的
            QuorumCnxManager qcm = this.createCnxnManager();
            QuorumCnxManager oldQcm = (QuorumCnxManager)this.qcmRef.getAndSet(qcm);
            if (oldQcm != null) { // 判断是否已经开启了选举
                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                oldQcm.halt();// 终止掉当前的选举
            }
      // 监听集群中的票据
            Listener listener = qcm.listener;
            if (listener != null) {
                listener.start();
                // 初始化了 FastLeaderElection
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();// 启动leader选举
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }
---------------------------------------------------------------------------
// 进入start方法中
public void start() {
        this.messenger.start();
    }
void start() {
    // 启动两个线程
    /*
    从这里可以看到zookeeper的整个流程基本上都是异步化的
    WorkerSender  发送票据
    WorkerReceiver  接收票据
    */
            this.wsThread.start();
            this.wrThread.start();
        }
protected class Messenger {
        FastLeaderElection.Messenger.WorkerSender ws;
        FastLeaderElection.Messenger.WorkerReceiver wr;
        Thread wsThread = null;
        Thread wrThread = null;
        Messenger(QuorumCnxManager manager) {
            // 启动的线程的初始化
            this.ws = new FastLeaderElection.Messenger.WorkerSender(manager);
            this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wsThread.setDaemon(true);
            this.wr = new FastLeaderElection.Messenger.WorkerReceiver(manager);
            this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]");
            this.wrThread.setDaemon(true);
        }

前面部分所有的逻辑如图所示


最终执行


前面的实际上通过阅读都是一些初始化逻辑,而真正的执行是 其中的 super.start()

// 进入 start 代码块
public synchronized void start() { // 重写线程的 start方法
        if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面
            throw new RuntimeException("My id " + this.myid + " not in the peer list");
        } else {
            this.loadDataBase(); // 加载数据
            this.startServerCnxnFactory();// 这里来启动2181的服务监听
            try {
                this.adminServer.start();
            } catch (AdminServerException var2) {
                LOG.warn("Problem starting AdminServer", var2);
                System.out.println(var2);
            }
            this.startLeaderElection(); // 开启leader选举
            this.startJvmPauseMonitor(); // 监控方面的东西
            super.start(); // 启动线程
        }
    }
--------------------------------------------------------------------------
public void run() {
        this.updateThreadName();
        LOG.debug("Starting quorum peer");
    // 整个这部分都是注册JMS 也就是上报指标的数据
        try {
            this.jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(this.jmxQuorumBean, (ZKMBeanInfo)null);
            Iterator var1 = this.getView().values().iterator();
            while(var1.hasNext()) {
                QuorumPeer.QuorumServer s = (QuorumPeer.QuorumServer)var1.next();
                if (this.getId() == s.id) {
                    LocalPeerBean p = this.jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, this.jmxQuorumBean);
                    } catch (Exception var88) {
                        LOG.warn("Failed to register with JMX", var88);
                        this.jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, this.jmxQuorumBean);
                        this.jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception var87) {
                        LOG.warn("Failed to register with JMX", var87);
                    }
                }
            }
        } catch (Exception var92) {
            LOG.warn("Failed to register with JMX", var92);
            this.jmxQuorumBean = null;
        }
        while(true) {
            boolean var27 = false;
            // 只要程序一直在运行,那么就是死循环
            try {
                var27 = true;
                if (!this.running) {
                    var27 = false;
                    break;
                }
                switch(this.getPeerState()) { // 得到当前节点的状态(leader follower looking observer)
                case LOOKING:
                    LOG.info("LOOKING");
                    ServerMetrics.getMetrics().LOOKING_COUNT.add(1L);
                    // 如果是只读模式
                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(this.logFactory, this, this.zkDb);
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    sleep((long)Math.max(2000, QuorumPeer.this.tickTime));
                                    if (QuorumPeer.ServerState.LOOKING.equals(QuorumPeer.this.getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException var2) {
                                    QuorumPeer.LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception var3) {
                                    QuorumPeer.LOG.error("FAILED to start ReadOnlyZooKeeperServer", var3);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            this.reconfigFlagClear();
                            if (this.shuttingDownLE) {
                                this.shuttingDownLE = false;
                                this.startLeaderElection();
                            }
                            this.setCurrentVote(this.makeLEStrategy().lookForLeader());
                        } catch (Exception var85) {
                            LOG.warn("Unexpected exception", var85);
                            this.setPeerState(QuorumPeer.ServerState.LOOKING);
                        } finally {
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                            this.reconfigFlagClear();
                            if (this.shuttingDownLE) {
                                this.shuttingDownLE = false;
                                this.startLeaderElection();
                            }
              // makeLEStrategy().lookForLeader() 得到一个vote,说明这个vote是满足被同意的vote,得到vote是一个leader的vote 当前节点一定会在选举算法中,得到leader之后,重新设置一个状态
                            this.setCurrentVote(this.makeLEStrategy().lookForLeader());
                        } catch (Exception var84) {
                            LOG.warn("Unexpected exception", var84);
                            this.setPeerState(QuorumPeer.ServerState.LOOKING);
                        }
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        try {
                            // 说明自己就是leader,那么把自己设置成leader
                            this.setLeader(this.makeLeader(this.logFactory));
                            this.leader.lead();
                            this.setLeader((Leader)null);
                        } catch (Exception var80) {
                            LOG.warn("Unexpected exception", var80);
                        }
                        break;
                    } finally {
                        if (this.leader != null) {
                            this.leader.shutdown("Forcing shutdown");
                            this.setLeader((Leader)null);
                        }
                        this.updateServerState();
                    }
                case FOLLOWING:
                    try {
                        try {
                            LOG.info("FOLLOWING");
                            this.setFollower(this.makeFollower(this.logFactory));
                            this.follower.followLeader();// 如果当前节点是follower,则连接到leader
                        } catch (Exception var81) {
                            LOG.warn("Unexpected exception", var81);
                        }
                        break;
                    } finally {
                        this.follower.shutdown();
                        this.setFollower((Follower)null);
                        this.updateServerState();
                    }
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        this.setObserver(this.makeObserver(this.logFactory));
                        this.observer.observeLeader();
                    } catch (Exception var83) {
                        LOG.warn("Unexpected exception", var83);
                    } finally {
                        this.observer.shutdown();
                        this.setObserver((Observer)null);
                        this.updateServerState();
                        if (this.isRunning()) {
                            Observer.waitForObserverElectionDelay();
                        }
                    }
                }
            } finally {
                if (var27) {
                    LOG.warn("QuorumPeer main thread exited");
                    MBeanRegistry instance = MBeanRegistry.getInstance();
                    instance.unregister(this.jmxQuorumBean);
                    instance.unregister(this.jmxLocalPeerBean);
                    Iterator var12 = this.jmxRemotePeerBean.values().iterator();
                    while(var12.hasNext()) {
                        RemotePeerBean remotePeerBean = (RemotePeerBean)var12.next();
                        instance.unregister(remotePeerBean);
                    }
                    this.jmxQuorumBean = null;
                    this.jmxLocalPeerBean = null;
                    this.jmxRemotePeerBean = null;
                }
            }
        }
        LOG.warn("QuorumPeer main thread exited");
        MBeanRegistry instance = MBeanRegistry.getInstance();
        instance.unregister(this.jmxQuorumBean);
        instance.unregister(this.jmxLocalPeerBean);
        Iterator var96 = this.jmxRemotePeerBean.values().iterator();
        while(var96.hasNext()) {
            RemotePeerBean remotePeerBean = (RemotePeerBean)var96.next();
            instance.unregister(remotePeerBean);
        }
        this.jmxQuorumBean = null;
        this.jmxLocalPeerBean = null;
        this.jmxRemotePeerBean = null;
    }


makeLEStrategy().lookForLeader() leader选举的算法


因为前面分析过源码,其实内部只有一种选举算法 FastLeaderElection

public Vote lookForLeader() throws InterruptedException {
    // 前面还是和JMX有关,监控数据
        try {
            this.self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean);
        } catch (Exception var24) {
            LOG.warn("Failed to register with JMX", var24);
            this.self.jmxLeaderElectionBean = null;
        }
        this.self.start_fle = Time.currentElapsedTime();
        try {
            Map<Long, Vote> recvset = new HashMap();
            Map<Long, Vote> outofelection = new HashMap();
            int notTimeout = minNotificationInterval;
            synchronized(this) {
                // 原地底层 logicalclock   代表逻辑时钟 epoch
                this.logicalclock.incrementAndGet();
                // 更新事务的票据 myid zxid epoch   把自己投票的提案设置成自己的信息
                this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
            }
            LOG.info("New election. My id = {}, proposed zxid=0x{}", this.self.getId(), Long.toHexString(this.proposedZxid));
            this.sendNotifications(); // 发送通知,将自己的票据发送到集群的其他节点,相当于每个节点都会收到其他节点的vote
            FastLeaderElection.Notification n;
            // 只要当前节点的状态是Looking,就不断地循环
            while(this.self.getPeerState() == ServerState.LOOKING && !this.stop) {
                // 从接收队列中获取一个 Notification 其表示接收到的集群中任意一个节点的vote
                n = (FastLeaderElection.Notification)this.recvqueue.poll((long)notTimeout, TimeUnit.MILLISECONDS);
                if (n == null) { // 判断是否有数据
                    if (this.manager.haveDelivered()) {
                        this.sendNotifications(); // 重新发送一次
                    } else {
                        this.manager.connectAll(); // 重连
                    }
          // 延迟重试
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                // 先验证收到的Notification是否合法
                // sid/leader 都代表myid
                } else if (this.validVoter(n.sid) && this.validVoter(n.leader)) {
                    SyncedLearnerTracker voteSet;
                    Vote endVote;
                    Vote var7;
                    switch(n.state) { // 收到通知的节点状态
                    case LOOKING: // 在leader选举的这个阶段,收到投票时,所有节点的状态都是looking
                        if (this.getInitLastLoggedZxid() == -1L) {
                            LOG.debug("Ignoring notification as our zxid is -1");  
                        } 
                        else if (n.zxid == -1L) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                        } else {
                            // 收到的epoch 要大于 自己的epoch
                            // 说明收到要比自己的更新,所以应该接纳对方的vote
                            if (n.electionEpoch > this.logicalclock.get()) {
                                this.logicalclock.set(n.electionEpoch); // 先将自己的epoch改成对方的
                                recvset.clear(); // 清空集合,重新归票
                                // 比较vote
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) {
                                    // 更新成收到的vote(leader zxid epoch)
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
                                }
                // 更新完成后继续发送广播 vote (可能是自己的,也可能是别人的)取决于epoch zxid myid
                                this.sendNotifications();
                            } else {
                                // 如果收到的epoch小于自己的,那么说明没资格影响自己的票据
                                if (n.electionEpoch < this.logicalclock.get()) {
                                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}", Long.toHexString(n.electionEpoch), Long.toHexString(this.logicalclock.get()));
                                    continue;
                                }
                // 说明epoch相等
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    this.sendNotifications();
                                }
                            }
                            LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}", new Object[]{n.sid, n.leader, Long.toHexString(n.zxid), Long.toHexString(n.electionEpoch)});
                            // 通过上面的多层比较,将其存在 recvset
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            // 根据选票数量来决定是否决定选举 
                            voteSet = this.getVoteTracker(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch));
--------------------------------------------------------------------------------  
    // 总体来说,该方法的目的是根据给定的投票信息和特定投票对象构建一个SyncedLearnerTracker对象,并将满足条件的确认节点添加到其中。 
    protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(this.self.getQuorumVerifier());
        if (this.self.getLastSeenQuorumVerifier() != null && this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
        }
        Iterator var4 = votes.entrySet().iterator();
        while(var4.hasNext()) {
            Entry<Long, Vote> entry = (Entry)var4.next();
            if (vote.equals(entry.getValue())) {
                voteSet.addAck((Long)entry.getKey());
            }
        }
        return voteSet;
    }                    
--------------------------------------------------------------------------------                                // 这里面的方法其实就是判断是否当前的myid 过了半数
                            if (!voteSet.hasAllQuorums()) {
                                continue;
                            }
-------------------------------------------------------------------------------- 
public boolean hasAllQuorums() {
        Iterator var1 = this.qvAcksetPairs.iterator();
        SyncedLearnerTracker.QuorumVerifierAcksetPair qvAckset;
        do {
            if (!var1.hasNext()) {
                return true;
            }
            qvAckset = (SyncedLearnerTracker.QuorumVerifierAcksetPair)var1.next();
        } while(qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()));
        return false;
    }           
public boolean containsQuorum(Set<Long> ackSet) {
        return ackSet.size() > this.half;
    }                      
-------------------------------------------------------------------------------- 
    // 验证是否会出现改变                        
    while((n = (FastLeaderElection.Notification)this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
                                    this.recvqueue.put(n);
                                    break;
                                }
                            }
                            if (n == null) {
                                // 投票结束,这是当前节点的状态
                                this.setPeerState(this.proposedLeader, voteSet);
                                // 更新最终选举的状态
                                endVote = new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch);
                                this.leaveInstance(endVote);
                                var7 = endVote;
                                return var7;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        if (n.electionEpoch == this.logicalclock.get()) {
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = this.getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            if (voteSet.hasAllQuorums() && this.checkLeader(recvset, n.leader, n.electionEpoch)) {
                                this.setPeerState(n.leader, voteSet);
                                endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                this.leaveInstance(endVote);
                                var7 = endVote;
                                return var7;
                            }
                        }
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = this.getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        if (voteSet.hasAllQuorums() && this.checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized(this) {
                                this.logicalclock.set(n.electionEpoch);
                                this.setPeerState(n.leader, voteSet);
                            }
                            endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            this.leaveInstance(endVote);
                            var7 = endVote;
                            return var7;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
                    }
                } else {
                    if (!this.validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!this.validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            n = null;
            return n;
        } finally {
            try {
                if (this.self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean);
                }
            } catch (Exception var21) {
                LOG.warn("Failed to unregister with JMX", var21);
            }
            this.self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", this.manager.getConnectionThreadCount());
        }
    }

整体流程


sendNotifications


private void sendNotifications() {
        Iterator var1 = this.self.getCurrentAndNextConfigVoters().iterator();
        while(var1.hasNext()) {
            long sid = (Long)var1.next();
            QuorumVerifier qv = this.self.getQuorumVerifier();
            FastLeaderElection.ToSend notmsg = new FastLeaderElection.ToSend(FastLeaderElection.ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock.get(), ServerState.LOOKING, sid, this.proposedEpoch, qv.toString().getBytes());
            LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient), {} (myid), 0x{} (n.peerEpoch) ", new Object[]{this.proposedLeader, Long.toHexString(this.proposedZxid), Long.toHexString(this.logicalclock.get()), sid, this.self.getId(), Long.toHexString(this.proposedEpoch)});
            this.sendqueue.offer(notmsg);
        }
    }

如果存在多个节点的话

先假设,三个节点,分别是 vote(1) vote(2) vote(3)


假设下边的收到 vote(1) 通知,发现 vote(2) 比 vote(1) 的epoch大,此时需要将 vote(1) -》 vote(2),下次发送出去也是 vote(2)

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
7月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
112 0
|
2月前
|
分布式计算 负载均衡 算法
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
34 1
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
50 1
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
57 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
54 0
|
3月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
5月前
|
存储 数据库
zookeeper 集群环境搭建及集群选举及数据同步机制
zookeeper 集群环境搭建及集群选举及数据同步机制
109 2
|
7月前
|
NoSQL 中间件 API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
200 2
|
7月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(上)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
189 0
|
7月前
|
存储 分布式计算 Dubbo
【中间件】zookeeper的实现原理
【中间件】zookeeper的实现原理
104 0

热门文章

最新文章