深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(上):https://developer.aliyun.com/article/1413768
如何实现?(leader)
如果选举出来的leader的zxid是在整个集群中最大的,也就意味着当前节点数据是最新的,那么zxid是和leader选举是有关系的。
每一轮新的leader选举,都会有一个epoch,是递加的。所以zxid由两部分组成(epoch + zxid)
通过这两个方式,能保障数据最新的是他的设计。
zxid(事务id)
代表着当前我们的每一个事务的操作都会生成一个唯一的id,是全局唯一的,所以为了保证事务的顺序一致性,使用一个递增的事务id来进行标记。前面说过所有的提案都会增加zxid。
zxid是64位的数字,其高32位是epoch(每一轮leader变化所带来的时钟周期的变化),低32位代表递增事务编号
源码分析猜想启动zookeeper
- 加载配置(zoo.cfg)
- 初始化
- 启动2181的端口(独立的业务服务),监听客户端请求(zkClient)
- 启动(2888、 3888)这个端口的监听
- 初始化leader选举(----)
- 开启leader选举
- 加载磁盘的数据
// 下面代码是zookeeper2.7.0的加载分析流程 QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); -------------------------------------------------------------------- protected void initializeAndRun(String[] args) throws ConfigException, IOException { // 用来保存全局配置的 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // 启动定时任务去做日志清理 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); // 如果是分布式的配置就启动这个 if (args.length == 1 && config.servers.size() > 0) { this.runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); ZooKeeperServerMain.main(args); // 否则启动单机的 } } -------------------------------------------------------------------- public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException var4) { LOG.warn("Unable to register log4j JMX control", var4); } LOG.info("Starting quorum peer"); try { /* ClientCnxn 客户端和服务端进行网络交互的类 ServerCnxn 服务端的网络通信处理类 */ ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // quorumPeer 表示集群中某个节点的信息,它会将启动时加载的一些东西配置在这里面 this.quorumPeer = new QuorumPeer(); this.quorumPeer.setClientPortAddress(config.getClientPortAddress()); this.quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()))); this.quorumPeer.setQuorumPeers(config.getServers()); this.quorumPeer.setElectionType(config.getElectionAlg());// 选举算法 this.quorumPeer.setMyid(config.getServerId()); this.quorumPeer.setTickTime(config.getTickTime()); // 心跳时间间隔 this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化的时长 this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步的时长 this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); this.quorumPeer.setCnxnFactory(cnxnFactory); this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化 this.quorumPeer.setLearnerType(config.getPeerType()); this.quorumPeer.setSyncEnabled(config.getSyncEnabled()); this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); this.quorumPeer.start(); // 启动任务 this.quorumPeer.join();// quorumPeer是一个线程,代表要等到quorumPeer这个线程执行结束 } catch (InterruptedException var3) { LOG.warn("Quorum Peer interrupted", var3); } }
Leader选举源码及原理分析
构建源码
- 基于Maven构建
启动Zookeeper Server
- 集群的方式 : QuorumPeerMain
- 单机的方式运行 : ZookeeperServerMain
猜想
- zoo.cfg , 加载配置
- 监听2181 , (NIO / Netty) ->NIO (TCP协议的课程中有讲到)
- 初始化一些2888、3888, 选举/数据同步的监听, BIO的方式
- 选举算法的初始化以及选举的执行(leader)
- 本地文件的加载和恢复
- 数据同步.
源码分析
基于zookeeper3.6.1
public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); --------------------------------------------------------------------- protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { // 用来保存全局配置 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); // 相当于zoo.cfg文件,解析配置文件并保存到QuorumPeerConfig } // 启动一个定时任务来清理日志 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 集群 this.runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); ZooKeeperServerMain.main(args); // 单独节点 } } --------------------------------------------------------------------- public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException var17) { LOG.warn("Unable to register log4j JMX control", var17); } LOG.info("Starting quorum peer"); MetricsProvider metricsProvider; // 指标数据 // 会发布当前zookeeper当中的指标数据 try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException var16) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), var16); } try { ServerMetrics.metricsProviderInitialized(metricsProvider); /* 这个和2181端口监听有关系 ClientCnxn 客户端和服务端进行网络交互的类 ServerCnxn 服务端的网络通信处理类 */ ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; // 安全连接的方式 if (config.getClientPortAddress() != null) { // 默认情况下 cnxnFactory = ServerCnxnFactory.createFactory(); // 监听对应的端口 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); } this.quorumPeer = this.getQuorumPeer(); this.quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); this.quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); this.quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); this.quorumPeer.setElectionType(config.getElectionAlg()); // 采用什么样的选举算法 this.quorumPeer.setMyid(config.getServerId());// myid this.quorumPeer.setTickTime(config.getTickTime());// 心跳时间间隔(2000) this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化时长 this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步时长 this.quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); this.quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); this.quorumPeer.setConfigFileName(config.getConfigFilename()); this.quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化 this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier() != null) { this.quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } this.quorumPeer.initConfigInZKDatabase(); this.quorumPeer.setCnxnFactory(cnxnFactory); this.quorumPeer.setSecureCnxnFactory(secureCnxnFactory); this.quorumPeer.setSslQuorum(config.isSslQuorum()); this.quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); this.quorumPeer.setLearnerType(config.getPeerType()); this.quorumPeer.setSyncEnabled(config.getSyncEnabled()); this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { this.quorumPeer.getX509Util().enableCertFileReloading(); } this.quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled()); this.quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled()); this.quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs()); this.quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (this.quorumPeer.isQuorumSaslAuthEnabled()) { this.quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); this.quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); this.quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); this.quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); this.quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } this.quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); this.quorumPeer.initialize(); if (config.jvmPauseMonitorToRun) { this.quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); } this.quorumPeer.start(); // 启动任务 ZKAuditProvider.addZKStartStopAuditLog(); this.quorumPeer.join(); // quorumPeer是线程,代表要等到quorumPeer 这个线程执行结束 } catch (InterruptedException var15) { LOG.warn("Quorum Peer interrupted", var15); } finally { if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable var14) { LOG.warn("Error while stopping metrics", var14); } } } }
连接zookeeper监控数据
连接上后可以看到其属性的指标
我们创建zookeeper节点的时候,会在zk的内存中jkdbbase,会有一个DataTree的对象,这里面主要就保存了zk里面节点的数据
// 进入 start 代码块 public synchronized void start() { // 重写线程的 start方法 if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面 throw new RuntimeException("My id " + this.myid + " not in the peer list"); } else { this.loadDataBase(); // 加载数据 this.startServerCnxnFactory();// 这里来启动2181的服务监听 try { this.adminServer.start(); } catch (AdminServerException var2) { LOG.warn("Problem starting AdminServer", var2); System.out.println(var2); } this.startLeaderElection(); // 开启leader选举 this.startJvmPauseMonitor(); // 监控方面的东西 super.start(); // 启动线程 } }
startServerCnxnFactory 开启监听
--------------------------------------------------------------- // 初始化 if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); } public static ServerCnxnFactory createFactory() throws IOException { // 获取一个系统的环境变量 String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory"); if (serverCnxnFactoryName == null) { // 使用NIO serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { // 否则会根据名字去加载对应的实现 ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception var3) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, var3); throw ioe; } } --------------------------------------------------------------- // 针对 configure 不同的Factory中有不同的实现 public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } else { // 安全逻辑方式登录 this.configureSaslLogin(); this.maxClientCnxns = maxcc; this.initMaxCnxns(); this.sessionlessCnxnTimeout = Integer.getInteger("zookeeper.nio.sessionlessCnxnTimeout", 10000); this.cnxnExpiryQueue = new ExpiryQueue(this.sessionlessCnxnTimeout); this.expirerThread = new NIOServerCnxnFactory.ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); // 获取当前系统处理器的核心数 this.numSelectorThreads = Integer.getInteger("zookeeper.nio.numSelectorThreads", Math.max((int)Math.sqrt((double)((float)numCores / 2.0F)), 1)); if (this.numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } else { this.numWorkerThreads = Integer.getInteger("zookeeper.nio.numWorkerThreads", 2 * numCores); this.workerShutdownTimeoutMS = Long.getLong("zookeeper.nio.shutdownTimeout", 5000L); String logMsg = "Configuring NIO connection handler with " + this.sessionlessCnxnTimeout / 1000 + "s sessionless connection timeout, " + this.numSelectorThreads + " selector thread(s), " + (this.numWorkerThreads > 0 ? this.numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : "" + directBufferBytes / 1024 + " kB direct buffers."); LOG.info(logMsg); // 通过前面的计算, for(int i = 0; i < this.numSelectorThreads; ++i) { this.selectorThreads.add(new NIOServerCnxnFactory.SelectorThread(i)); } this.listenBacklog = backlog; // 打开一个ServerSocketChannel实例 this.ss = ServerSocketChannel.open(); this.ss.socket().setReuseAddress(true); LOG.info("binding to port {}", addr); if (this.listenBacklog == -1) { this.ss.socket().bind(addr); // 绑定监听的端口号 } else { this.ss.socket().bind(addr, this.listenBacklog); } this.ss.configureBlocking(false);// 配置为非阻塞 this.acceptThread = new NIOServerCnxnFactory.AcceptThread(this.ss, addr, this.selectorThreads);// 接收线程 /** * * AcceptThread 用于处理接收客户端的请求 * * selectorThreads 用来处理selector的读写请求 */ } } } ----------------------------------------------------------------------- private void startServerCnxnFactory() { if (this.cnxnFactory != null) { this.cnxnFactory.start(); //在之前的配置中进行了赋值 } if (this.secureCnxnFactory != null) { this.secureCnxnFactory.start(); //在之前的配置中进行了赋值 } } public void start() { this.stopped = false; if (this.workerPool == null) { this.workerPool = new WorkerService("NIOWorker", this.numWorkerThreads, false); } Iterator var1 = this.selectorThreads.iterator(); while(var1.hasNext()) { NIOServerCnxnFactory.SelectorThread thread = (NIOServerCnxnFactory.SelectorThread)var1.next(); if (thread.getState() == State.NEW) { thread.start(); // 而这里面的start 会调用里面的run方法 // 开启selector轮询io操作的线程,如果当前没有就绪的连接,那么就会阻塞到select()里,因为里面的run方法里面的select 里面的 selector.select() 进行复路器的选择的时候,如果没有就绪连接,那么它会阻塞。 } } if (this.acceptThread.getState() == State.NEW) { this.acceptThread.start();// 用于处理客户端的连接 } // 在这里可以借鉴的是,其将io处理 和 连接处理的两个操作分开,到两个线程去处理,而且其采用多个线程进行一个轮询,整体提升其处理的性能。 if (this.expirerThread.getState() == State.NEW) { this.expirerThread.start(); } } public void run() { try { while(!NIOServerCnxnFactory.this.stopped && !this.acceptSocket.socket().isClosed()) { try { this.select(); } catch (RuntimeException var6) { NIOServerCnxnFactory.LOG.warn("Ignoring unexpected runtime exception", var6); } catch (Exception var7) { NIOServerCnxnFactory.LOG.warn("Ignoring unexpected exception", var7); } } } finally { this.closeSelector(); if (!this.reconfiguring) { NIOServerCnxnFactory.this.stop(); } NIOServerCnxnFactory.LOG.info("accept thread exitted run method"); } } private void select() { try { this.selector.select(); // 客户端进行轮询,去轮询就绪的连接 Iterator selectedKeys = this.selector.selectedKeys().iterator(); while(!NIOServerCnxnFactory.this.stopped && selectedKeys.hasNext()) { SelectionKey key = (SelectionKey)selectedKeys.next(); selectedKeys.remove(); if (key.isValid()) { if (key.isAcceptable()) { if (!this.doAccept()) { this.pauseAccept(10L); } } else { NIOServerCnxnFactory.LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } } catch (IOException var3) { NIOServerCnxnFactory.LOG.warn("Ignoring IOException while selecting", var3); } }
startLeaderElection 开启leader选举
public synchronized void startLeaderElection() { try { // 得到当前节点的状态,如果是LOOKING if (this.getPeerState() == QuorumPeer.ServerState.LOOKING) { // 构建一个Vote(myid, zxid, epoch) this.currentVote = new Vote(this.myid, this.getLastLoggedZxid(), this.getCurrentEpoch()); } } catch (IOException var3) { RuntimeException re = new RuntimeException(var3.getMessage()); re.setStackTrace(var3.getStackTrace()); throw re; } // 根据 electionType 来创建选举算法 this.electionAlg = this.createElectionAlgorithm(this.electionType); } ------------------------------------------------------------------------------ // 在3.6版本中,无论怎么配置其他的选举算法,最后都支持一种,因为在源码中可以看到,前两种直接抛出了异常。 protected Election createElectionAlgorithm(int electionAlgorithm) { Election le = null; switch(electionAlgorithm) { case 1: throw new UnsupportedOperationException("Election Algorithm 1 is not supported."); case 2: throw new UnsupportedOperationException("Election Algorithm 2 is not supported."); case 3: // cnxn (和网络通信有关的一个类,ServerCnxn,ClientCnxn) // QuorumCnxManager 管理集群选举和投票相关的 QuorumCnxManager qcm = this.createCnxnManager(); QuorumCnxManager oldQcm = (QuorumCnxManager)this.qcmRef.getAndSet(qcm); if (oldQcm != null) { // 判断是否已经开启了选举 LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); oldQcm.halt();// 终止掉当前的选举 } // 监听集群中的票据 Listener listener = qcm.listener; if (listener != null) { listener.start(); // 初始化了 FastLeaderElection FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start();// 启动leader选举 le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; } --------------------------------------------------------------------------- // 进入start方法中 public void start() { this.messenger.start(); } void start() { // 启动两个线程 /* 从这里可以看到zookeeper的整个流程基本上都是异步化的 WorkerSender 发送票据 WorkerReceiver 接收票据 */ this.wsThread.start(); this.wrThread.start(); } protected class Messenger { FastLeaderElection.Messenger.WorkerSender ws; FastLeaderElection.Messenger.WorkerReceiver wr; Thread wsThread = null; Thread wrThread = null; Messenger(QuorumCnxManager manager) { // 启动的线程的初始化 this.ws = new FastLeaderElection.Messenger.WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]"); this.wsThread.setDaemon(true); this.wr = new FastLeaderElection.Messenger.WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]"); this.wrThread.setDaemon(true); }
前面部分所有的逻辑如图所示
最终执行
前面的实际上通过阅读都是一些初始化逻辑,而真正的执行是 其中的 super.start()
// 进入 start 代码块 public synchronized void start() { // 重写线程的 start方法 if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面 throw new RuntimeException("My id " + this.myid + " not in the peer list"); } else { this.loadDataBase(); // 加载数据 this.startServerCnxnFactory();// 这里来启动2181的服务监听 try { this.adminServer.start(); } catch (AdminServerException var2) { LOG.warn("Problem starting AdminServer", var2); System.out.println(var2); } this.startLeaderElection(); // 开启leader选举 this.startJvmPauseMonitor(); // 监控方面的东西 super.start(); // 启动线程 } } -------------------------------------------------------------------------- public void run() { this.updateThreadName(); LOG.debug("Starting quorum peer"); // 整个这部分都是注册JMS 也就是上报指标的数据 try { this.jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(this.jmxQuorumBean, (ZKMBeanInfo)null); Iterator var1 = this.getView().values().iterator(); while(var1.hasNext()) { QuorumPeer.QuorumServer s = (QuorumPeer.QuorumServer)var1.next(); if (this.getId() == s.id) { LocalPeerBean p = this.jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, this.jmxQuorumBean); } catch (Exception var88) { LOG.warn("Failed to register with JMX", var88); this.jmxLocalPeerBean = null; } } else { RemotePeerBean rBean = new RemotePeerBean(this, s); try { MBeanRegistry.getInstance().register(rBean, this.jmxQuorumBean); this.jmxRemotePeerBean.put(s.id, rBean); } catch (Exception var87) { LOG.warn("Failed to register with JMX", var87); } } } } catch (Exception var92) { LOG.warn("Failed to register with JMX", var92); this.jmxQuorumBean = null; } while(true) { boolean var27 = false; // 只要程序一直在运行,那么就是死循环 try { var27 = true; if (!this.running) { var27 = false; break; } switch(this.getPeerState()) { // 得到当前节点的状态(leader follower looking observer) case LOOKING: LOG.info("LOOKING"); ServerMetrics.getMetrics().LOOKING_COUNT.add(1L); // 如果是只读模式 if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(this.logFactory, this, this.zkDb); Thread roZkMgr = new Thread() { public void run() { try { sleep((long)Math.max(2000, QuorumPeer.this.tickTime)); if (QuorumPeer.ServerState.LOOKING.equals(QuorumPeer.this.getPeerState())) { roZk.startup(); } } catch (InterruptedException var2) { QuorumPeer.LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception var3) { QuorumPeer.LOG.error("FAILED to start ReadOnlyZooKeeperServer", var3); } } }; try { roZkMgr.start(); this.reconfigFlagClear(); if (this.shuttingDownLE) { this.shuttingDownLE = false; this.startLeaderElection(); } this.setCurrentVote(this.makeLEStrategy().lookForLeader()); } catch (Exception var85) { LOG.warn("Unexpected exception", var85); this.setPeerState(QuorumPeer.ServerState.LOOKING); } finally { roZkMgr.interrupt(); roZk.shutdown(); } } else { try { this.reconfigFlagClear(); if (this.shuttingDownLE) { this.shuttingDownLE = false; this.startLeaderElection(); } // makeLEStrategy().lookForLeader() 得到一个vote,说明这个vote是满足被同意的vote,得到vote是一个leader的vote 当前节点一定会在选举算法中,得到leader之后,重新设置一个状态 this.setCurrentVote(this.makeLEStrategy().lookForLeader()); } catch (Exception var84) { LOG.warn("Unexpected exception", var84); this.setPeerState(QuorumPeer.ServerState.LOOKING); } } break; case LEADING: LOG.info("LEADING"); try { try { // 说明自己就是leader,那么把自己设置成leader this.setLeader(this.makeLeader(this.logFactory)); this.leader.lead(); this.setLeader((Leader)null); } catch (Exception var80) { LOG.warn("Unexpected exception", var80); } break; } finally { if (this.leader != null) { this.leader.shutdown("Forcing shutdown"); this.setLeader((Leader)null); } this.updateServerState(); } case FOLLOWING: try { try { LOG.info("FOLLOWING"); this.setFollower(this.makeFollower(this.logFactory)); this.follower.followLeader();// 如果当前节点是follower,则连接到leader } catch (Exception var81) { LOG.warn("Unexpected exception", var81); } break; } finally { this.follower.shutdown(); this.setFollower((Follower)null); this.updateServerState(); } case OBSERVING: try { LOG.info("OBSERVING"); this.setObserver(this.makeObserver(this.logFactory)); this.observer.observeLeader(); } catch (Exception var83) { LOG.warn("Unexpected exception", var83); } finally { this.observer.shutdown(); this.setObserver((Observer)null); this.updateServerState(); if (this.isRunning()) { Observer.waitForObserverElectionDelay(); } } } } finally { if (var27) { LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(this.jmxQuorumBean); instance.unregister(this.jmxLocalPeerBean); Iterator var12 = this.jmxRemotePeerBean.values().iterator(); while(var12.hasNext()) { RemotePeerBean remotePeerBean = (RemotePeerBean)var12.next(); instance.unregister(remotePeerBean); } this.jmxQuorumBean = null; this.jmxLocalPeerBean = null; this.jmxRemotePeerBean = null; } } } LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(this.jmxQuorumBean); instance.unregister(this.jmxLocalPeerBean); Iterator var96 = this.jmxRemotePeerBean.values().iterator(); while(var96.hasNext()) { RemotePeerBean remotePeerBean = (RemotePeerBean)var96.next(); instance.unregister(remotePeerBean); } this.jmxQuorumBean = null; this.jmxLocalPeerBean = null; this.jmxRemotePeerBean = null; }
makeLEStrategy().lookForLeader() leader选举的算法
因为前面分析过源码,其实内部只有一种选举算法 FastLeaderElection
public Vote lookForLeader() throws InterruptedException { // 前面还是和JMX有关,监控数据 try { this.self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean); } catch (Exception var24) { LOG.warn("Failed to register with JMX", var24); this.self.jmxLeaderElectionBean = null; } this.self.start_fle = Time.currentElapsedTime(); try { Map<Long, Vote> recvset = new HashMap(); Map<Long, Vote> outofelection = new HashMap(); int notTimeout = minNotificationInterval; synchronized(this) { // 原地底层 logicalclock 代表逻辑时钟 epoch this.logicalclock.incrementAndGet(); // 更新事务的票据 myid zxid epoch 把自己投票的提案设置成自己的信息 this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch()); } LOG.info("New election. My id = {}, proposed zxid=0x{}", this.self.getId(), Long.toHexString(this.proposedZxid)); this.sendNotifications(); // 发送通知,将自己的票据发送到集群的其他节点,相当于每个节点都会收到其他节点的vote FastLeaderElection.Notification n; // 只要当前节点的状态是Looking,就不断地循环 while(this.self.getPeerState() == ServerState.LOOKING && !this.stop) { // 从接收队列中获取一个 Notification 其表示接收到的集群中任意一个节点的vote n = (FastLeaderElection.Notification)this.recvqueue.poll((long)notTimeout, TimeUnit.MILLISECONDS); if (n == null) { // 判断是否有数据 if (this.manager.haveDelivered()) { this.sendNotifications(); // 重新发送一次 } else { this.manager.connectAll(); // 重连 } // 延迟重试 int tmpTimeOut = notTimeout * 2; notTimeout = Math.min(tmpTimeOut, maxNotificationInterval); LOG.info("Notification time out: {}", notTimeout); // 先验证收到的Notification是否合法 // sid/leader 都代表myid } else if (this.validVoter(n.sid) && this.validVoter(n.leader)) { SyncedLearnerTracker voteSet; Vote endVote; Vote var7; switch(n.state) { // 收到通知的节点状态 case LOOKING: // 在leader选举的这个阶段,收到投票时,所有节点的状态都是looking if (this.getInitLastLoggedZxid() == -1L) { LOG.debug("Ignoring notification as our zxid is -1"); } else if (n.zxid == -1L) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid); } else { // 收到的epoch 要大于 自己的epoch // 说明收到要比自己的更新,所以应该接纳对方的vote if (n.electionEpoch > this.logicalclock.get()) { this.logicalclock.set(n.electionEpoch); // 先将自己的epoch改成对方的 recvset.clear(); // 清空集合,重新归票 // 比较vote if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) { // 更新成收到的vote(leader zxid epoch) this.updateProposal(n.leader, n.zxid, n.peerEpoch); } else { this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch()); } // 更新完成后继续发送广播 vote (可能是自己的,也可能是别人的)取决于epoch zxid myid this.sendNotifications(); } else { // 如果收到的epoch小于自己的,那么说明没资格影响自己的票据 if (n.electionEpoch < this.logicalclock.get()) { LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}", Long.toHexString(n.electionEpoch), Long.toHexString(this.logicalclock.get())); continue; } // 说明epoch相等 if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) { this.updateProposal(n.leader, n.zxid, n.peerEpoch); this.sendNotifications(); } } LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}", new Object[]{n.sid, n.leader, Long.toHexString(n.zxid), Long.toHexString(n.electionEpoch)}); // 通过上面的多层比较,将其存在 recvset recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 根据选票数量来决定是否决定选举 voteSet = this.getVoteTracker(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch)); -------------------------------------------------------------------------------- // 总体来说,该方法的目的是根据给定的投票信息和特定投票对象构建一个SyncedLearnerTracker对象,并将满足条件的确认节点添加到其中。 protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(this.self.getQuorumVerifier()); if (this.self.getLastSeenQuorumVerifier() != null && this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(this.self.getLastSeenQuorumVerifier()); } Iterator var4 = votes.entrySet().iterator(); while(var4.hasNext()) { Entry<Long, Vote> entry = (Entry)var4.next(); if (vote.equals(entry.getValue())) { voteSet.addAck((Long)entry.getKey()); } } return voteSet; } -------------------------------------------------------------------------------- // 这里面的方法其实就是判断是否当前的myid 过了半数 if (!voteSet.hasAllQuorums()) { continue; } -------------------------------------------------------------------------------- public boolean hasAllQuorums() { Iterator var1 = this.qvAcksetPairs.iterator(); SyncedLearnerTracker.QuorumVerifierAcksetPair qvAckset; do { if (!var1.hasNext()) { return true; } qvAckset = (SyncedLearnerTracker.QuorumVerifierAcksetPair)var1.next(); } while(qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())); return false; } public boolean containsQuorum(Set<Long> ackSet) { return ackSet.size() > this.half; } -------------------------------------------------------------------------------- // 验证是否会出现改变 while((n = (FastLeaderElection.Notification)this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) { if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) { this.recvqueue.put(n); break; } } if (n == null) { // 投票结束,这是当前节点的状态 this.setPeerState(this.proposedLeader, voteSet); // 更新最终选举的状态 endVote = new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch); this.leaveInstance(endVote); var7 = endVote; return var7; } } break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: case LEADING: if (n.electionEpoch == this.logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = this.getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && this.checkLeader(recvset, n.leader, n.electionEpoch)) { this.setPeerState(n.leader, voteSet); endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); this.leaveInstance(endVote); var7 = endVote; return var7; } } outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = this.getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && this.checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized(this) { this.logicalclock.set(n.electionEpoch); this.setPeerState(n.leader, voteSet); } endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); this.leaveInstance(endVote); var7 = endVote; return var7; } break; default: LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid); } } else { if (!this.validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!this.validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } n = null; return n; } finally { try { if (this.self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean); } } catch (Exception var21) { LOG.warn("Failed to unregister with JMX", var21); } this.self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", this.manager.getConnectionThreadCount()); } }
整体流程
sendNotifications
private void sendNotifications() { Iterator var1 = this.self.getCurrentAndNextConfigVoters().iterator(); while(var1.hasNext()) { long sid = (Long)var1.next(); QuorumVerifier qv = this.self.getQuorumVerifier(); FastLeaderElection.ToSend notmsg = new FastLeaderElection.ToSend(FastLeaderElection.ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock.get(), ServerState.LOOKING, sid, this.proposedEpoch, qv.toString().getBytes()); LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient), {} (myid), 0x{} (n.peerEpoch) ", new Object[]{this.proposedLeader, Long.toHexString(this.proposedZxid), Long.toHexString(this.logicalclock.get()), sid, this.self.getId(), Long.toHexString(this.proposedEpoch)}); this.sendqueue.offer(notmsg); } }
如果存在多个节点的话
先假设,三个节点,分别是 vote(1) vote(2) vote(3)
假设下边的收到 vote(1) 通知,发现 vote(2) 比 vote(1) 的epoch大,此时需要将 vote(1) -》 vote(2),下次发送出去也是 vote(2)