源码入口#
单机版本还是集群版本的启动流程中,前部分几乎是相同的,一直到QuorumPeerMain.java
的initializeAndRun()
方法,单机模式下运行的是ZooKeeperServerMain.main(args);
, 集群模式下,运行的是runFromConfig(config);
因此当前博客从QuorumPeerMain
的runFromConfig()
开始
其中的QuorumPeer.java
可以看成ZK集群中的每一个server实体,下面代码大部分篇幅是在当前server的属性完成初始化
// todo 集群启动的逻辑 public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // todo new QuorumPeer() 可以理解成, 创建了集群中的一个server quorumPeer = getQuorumPeer(); // todo 将配置文件中解析出来的文件原封不动的赋值给我们的new 的QuorumPeer quorumPeer.setQuorumPeers(config.getServers()); quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir()))); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if(quorumPeer.isQuorumSaslAuthEnabled()){ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); // todo 着重看这个方法 quorumPeer.start(); quorumPeer.join();
跟进quorumPeer.start()
方法,源码如下, 主要做了如下几件事
- 数据恢复
- 经过上下文的工厂,启动这个线程类,使当前的server拥有接受client请求的能力(但是RequestProcessor没有初始化,因此它能接受request,却不能处理request)
- 选举Leader,在这个过程中会在Follower中选举出一个leader,确立好集群中的 Leader,Follower,Observer的三大角色
- 启动当前线程类
QuorumPeer.java
@Override public synchronized void start() { // todo 从磁盘中加载数据到内存中 loadDataBase(); // todo 启动上下文的这个工厂,他是个线程类, 接受客户端的请求 cnxnFactory.start(); // todo 开启leader的选举工作 startLeaderElection(); // todo 确定服务器的角色, 启动的就是当前类的run方法在900行 super.start(); }
看一下QuorumPeer.java
的run方法,部分源码如下,逻辑很清楚通过了上面的角色的选举之后,集群中各个节点的角色已经确定下来了,那拥有不同角色的节点就会进入下面代码中不同的case分支中
- looking : 正在进行领导者的选举
- observer: 观察者
- leading : 集群的leader
- following: 集群的Follower
while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer( logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb); Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception",e); setPeerState(ServerState.LOOKING); } finally { roZkMgr.interrupt(); roZk.shutdown(); } } else { try { setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); setPeerState(ServerState.LOOKING); } break; case FOLLOWING: // todo server 当选follow角色 try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; case LEADING: // todo 服务器成功当选成leader LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); // todo 跟进lead leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } }
下面看一下,server当选成不同的角色后,后干了什么
总览Leader&Follower#
当选成Leader#
跟进源码,上面代码片段中makeLeader()
由这个方法创建了一个Leader的封装类
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException { // todo 跟进它的Leader 构造方法 return new Leader(this, new LeaderZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); }
这是LeaderZooKeeperServer
的继承图,可以看到其实他继承了单机模式下的ZKServer
调用leader.lead()
方法,这个方法主要做了如下几件事
- 创建了
StateSummary
对象
- 这个对象封装了
zxid
以及currentEpoch
, 其中zxid就是最后一次和znode相关的事务id,后者是当前的epoch
它有64位,高32位标记是第几代Leader,后32位是当前代leader提交的事务次数,Follower只识别高版本的前32位为Leader
- 针对每一个Learner都开启了一条新的线程
LearnerCnxAcceptor
,这条线程负责Leader和Learner(Observer+Follower)之间的IO交流 - 在
LearnerCnxAcceptor
的run()
方法中,只要有新的连接来了,新开启了一条新的线程,LearnerHander
,由他负责Leader中接受每一个参议员的packet,以及监听新连接的到来 - leader启动...
void lead() throws IOException, InterruptedException { zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean); try { self.tick.set(0); zk.loadData(); // todo 创建了 封装有状态比较逻辑的对象 leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); // todo 创建一个新的线程,为了新的 followers 来连接 cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); readyToStart = true; // todo long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); synchronized(this){ lastProposed = zk.getZxid(); } newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) { LOG.info("NEWLEADER proposal has Zxid of " + Long.toHexString(newLeaderProposal.packet.getZxid())); } waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); try { waitForNewLeaderAck(self.getId(), zk.getZxid()); } catch (InterruptedException e) { shutdown("Waiting for a quorum of followers, only synced with sids: [ " + getSidSetString(newLeaderProposal.ackSet) + " ]"); HashSet<Long> followerSet = new HashSet<Long>(); for (LearnerHandler f : learners) followerSet.add(f.getSid()); if (self.getQuorumVerifier().containsQuorum(followerSet)) { LOG.warn("Enough followers present. " + "Perhaps the initTicks need to be increased."); } Thread.sleep(self.tickTime); self.tick.incrementAndGet(); return; } // todo 启动server startZkServer(); String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid"); if (initialZxid != null) { long zxid = Long.parseLong(initialZxid); zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid); } if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { self.cnxnFactory.setZooKeeperServer(zk); } boolean tickSkip = true; while (true) { Thread.sleep(self.tickTime / 2); if (!tickSkip) { self.tick.incrementAndGet(); } HashSet<Long> syncedSet = new HashSet<Long>(); // lock on the followers when we use it. syncedSet.add(self.getId()); for (LearnerHandler f : getLearners()) { // Synced set is used to check we have a supporting quorum, so only // PARTICIPANT, not OBSERVER, learners should be used if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) { syncedSet.add(f.getSid()); } f.ping(); } // check leader running status if (!this.isRunning()) { shutdown("Unexpected internal error"); return; } if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) { //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) { // Lost quorum, shutdown shutdown("Not sufficient followers synced, only synced with sids: [ " + getSidSetString(syncedSet) + " ]"); // make sure the order is the same! // the leader goes to looking return; } tickSkip = !tickSkip; } } finally { zk.unregisterJMX(this); } }
当选成Follower#
通过上面的case分支进入FOLLOWING
块,进入followerLeader
方法
下面的Follower.java
中的代码的主要逻辑:
- 和Leader建立起连接
registerWithLeader()
注册进LeadersyncWithLeader()
从Leader中同步数据并完成启动- 在
while(true){...}
中接受leader发送过来的packet,处理packet
void followLeader() throws InterruptedException { fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); try { // todo 找出LeaderServer QuorumServer leaderServer = findLeader(); try { // todo 和Leader建立连接 connectToLeader(leaderServer.addr, leaderServer.hostname); // todo 注册在leader上(会往leader上发送数据) //todo 这个Epoch代表当前是第几轮选举leader, 这个值给leader使用,由leader从接收到的最大的epoch中选出最大的,然后统一所有learner中的epoch值 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } // todo 从leader同步数据, 同时也是在这个方法中完成初始化启动的 syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); // todo 在follower中开启无线循环, 不停的接收服务端的pakcet,然后处理packet while (this.isRunning()) { readPacket(qp); // todo (接受leader发送的提议) processPacket(qp); } } catch (Exception e) { LOG.warn("Exception when following the leader", e); try { sock.close(); } catch (IOException e1) { e1.printStackTrace(); } // clear pending revalidations pendingRevalidations.clear(); } } finally { zk.unregisterJMX((Learner)this); } }