ZAB协议恢复模式-leader选举

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 详细讲述zookeeper的leader fast选举流程。 源码分析。 通过示例分析选举详细过程,以及可能的各种异常场景。

之前在网上看了很多zookeeper的zab原理,但讲述的都不够详细,很多地方模糊不清,所以就研究了一下zookeeper源码,并整理成几篇博客,希望对其他小伙伴有所帮助。本文是ZAB协议崩溃恢复Leader选举部分的内容,数据同步见另一篇博客 ZAB协议恢复模式-数据同步》     

为了避免理解上的歧义,将投票动作和投票信息区分开,在本文中,我将服务器的投票信息称之为选票。

一  基本概念

1.  Noitifcation

Notification其实是选举过程中的通信信息;选举过程主要围绕Notification进行。

选Leader过程中Zookeeper server(QuorumPeer)都会根据Notification信息生成Vote(选票信息)。为了方便以下理解,我们不妨将Notification看成每个zookeeper server的选票信息。

以下是notification的主要字段。

1)   zxid

事务id,事务请求的唯一标记,由leader服务器负责进行分配。

高32位是peerEpoch,低32位是请求的计数,从0开始。

2)   peerEpoch

每次leader选举完成之后,都会选举出一个新的peerEpoch,用来标记事务请求所属的轮次。

3)   electionEpoch

每次leader选举,electionEpoch就会自增1,统计选票信息时,首先保证electionEpoch相同。

4)   sid

服务器id

5)   leader

提议的leader。

 

2.  其他概念

1)   lastProcessedZxid

最后一次commit的事务请求的zxid

 

二  Zookeeper Server

1.  QuorumPeer

Zookeeper server启动始于QuorumPeerMain#main()。

Zookeeper server的主要逻辑都在QuorumPeer中,此类具体负责的逻辑如下流程图:

58e8cb9c917a85ac8d0cbd4fb901fe7199685217

本文仅介绍Leader选举流程内容,其他流程(follower流程、leader流程、observer流程)见《ZAB协议恢复模式-数据同步》。

从流程图中可以看到,QuorumPeer将一直运行,直到running=false。While无限循环中,根据当前zookeeper服务器的投票状态进入不同的业务逻辑。

服务器启动时处于LOOKING状态;退出任何子流程以后状态立即被改成LOOKING状态。LOOKING状态,表示Zookeeper服务端在进行选举流程。

在集群环境下,任何一台服务器都可能被选中成为leader,但每台服务器成为leader的可能性会有所不同,具体为:zxid、peerEpoch、electionEpoch、sid大者更容易被选举为leader,选举流程部分会详细讲述此中缘由。

 

2.  状态

1)   LOOKING

表示在选举leader。

2)   FOLLOWING

服务器的角色为follower。

3)   LEADING

服务器的角色为leader。

4)   OBSERVING

服务器的角色为observer,此种服务器不参与投票,只是同步状态。

 

三  Leader选举

1.  选举须知

选举流程比较复杂,在正式进入选举流程之前,需要先弄清楚以下内容:

每个Zookeeper服务端进入LOOKING状态以后,都会发起选举流程,默认情况下是快速选举,所以由FastLeaderElection#lookForLeader方法承担此职责。

每个Zookeeper服务器接收到选票提议以后,只有两个选择:

  • 接受选票提议,认可提议中推荐的服务器作为Leader候选人;
  • 不接受选票提议,推荐自己上一次推荐的服务器作为Leader候选人。(选举开始是总是推荐自己作为候选人,选举中会根据收到的选票信息决定是否更换推荐候选人)

默认情况下,至少超过半数(即n/2+1)服务器投票给同一个Leader候选人时,Leader候选人才有可能被选中为Leader。(这里说的是有可能,还需要进行一些其他逻辑进行验证)。

 

2.  流程

633b20279d62daad52c8efde132fcd07c89dc0ef

以上为代码的完整流程,看起来比较复杂,我们可以按照以下内容简单理解一下。

需要说明的是:此流程结束仅仅是确认那个服务器成为Leader,具体Leader是否能够最终成为Leader,还有另外的流程决定,这部分内容会见《ZAB协议恢复模式-数据同步》。

 

3.  流程详述

流程比较复杂,接下来对流程图中标有数字的地方详细介绍。

1)   1-自增logicalclock

Logicalclock就是Notification中的electionEpoch.

选举的第一个操作是logicalclock自增,接着更新提议信息,其实第一次总是提议自己作为Leader。

如果和现实中总统选举做一个类比的话,每次总统选举时都要明确这是第几届选举,logicalclock就对应的是“第几届”。整个选举必须保证处于同一届选举中方有效。

 

2)   2-发送选票信息

这是一个异步操作(由sendNotifications封装),将提议信息放到FastLeaderElection#sendqueue队列中,然后异步的发送个所有其他zookeeper server(这里指的是所有参与投票的服务器,不会发送个Observer类型的服务器)。

 

3)   3-从选票队列中取选票信息

当前server收到其他服务器的选举回复信息以后,将选票信息放在FastLeaderElection#recvqueue,当服务器循环从此队列中取选票信息。

如果队列中有选票信息立即返回,如果没有则等待。这里有一个超时时间,如果超过此时间依然没有选票信息,则返回null,这么做可以防止死等。

 

4)   4-判断消息是否发送出去

当从recvqueue没有取得选票信息时,会检查是否已经将提议的leader发送给其他server了,如果queueSendMap(待发送队列)为空,说明已经全部发送出去了;否则认为没有发送出去,此时会重连其他zookeeper server,保证链路畅通。

 

5)   5-重连其他ZookeeperServer

如果链路出现异常,可能会导致提议信息无法发送成功,所以如果queueSendMap中的信息没有全部发送出去,此时会重连其他zookeeperserver,以保证zookeeper集群的链路畅通。

 

6)   6-LOOKING状态时,electionEpoch比较

如果收到的选票信息状态为LOOKING,说明对方也在选举中。

a)   electionEpoch比较

进行electionEpoch比较的目的是统一当前是第几届选举。

  • 如果收到选票的electionEpoch更大,那么使用收到选票的electionEpoch作为“届”,然后清空收到的选票信息,更新提议信息(这里有一个判断过程),重新发送更新后的提议。
  • 如果收到选票的electionEpoch小,直接忽略此选票。
  • 如果收到选票的electionEpoch和当前相同,那么认为是合法的选票,接着判断是否应该更新选票。
b)   接受选票的提议

当且仅当以下三个条件满足其一时,将接受选票的提议,并重新发送选票信息。

  • n.peerEpoch > self.proposedEpoch
  • n.peerEpoch == self.proposedEpoch&& n.zxid > self.proposedZxid
  • n.peerEpoch == self.proposedEpoch&& n.zxid = self.proposedZxid && n.leader > self.proposedLeader

n指的是收到的选票,self指的是当前服务器自身的提议

由此可知:peerEpoch、zxid、leader越大,与有可能成为Leader。

proposedLeader开始的时候一定是当前server的id,但随着选举的进行,会变成上一次提议的leader。

 

7)   7-Leader是否有效

如果某一个server已经得到半数以上的选票,那么进入leader是否有效的验证逻辑,具体如下:

无限循环的从recvqueue中取选票,满足一下条件之一时退出循环:

  • recvqueue没有选举票(超时时间内一直没有获取到选票);
  • 取到一个更新的选票信息(满足“接受选票提议”的条件,则说明提议更新)。

这里其实是一个Leader有效性的校验。依次从recvqueue中取出所有的选票,校验发现所有的选票均满足“接受选票提议”时,说明没有服务器的选票能够推翻之前的结论,所以此时可以认为Leader是有效的。

 

8)   8-FOLLOWING、LEADING状态时,electionEpoch比较

a)   选票集合

为了将此部分解释清楚,需要先能清楚选举过程中用到的两个集合:

recvset:用来记录选票信息,以方便后续统计;

outofelection:用来记录选举逻辑之外的选票,例如当一个服务器加入zookeeper集群时,因为集群已经存在,不用重新选举,只需要在满足一定条件下加入集群即可。

 

b)   electionEpoch比较

如果收到的选票显示处于FOLLOWING、LEADING状态,说明集群目前有Leader,只需要确保当前服务器和Leader能够正常通信,并收到了集群半数以上服务器推荐推荐此Leader时,就直接加入到集群中去。

因为Leader已经存在,所以所有的选票都会加入到outofelection中。如果outofelection有一条选票是来自leader的,那么就可以认为自己和leader正常通信;如果outofelection中统计出有超过半数的服务器都推荐了这个leader,那么毫无疑问,此选票推荐的就是我们的leader。

 

c)   源码逻辑

  • 如果logicalclock = n.electionEpoch相同,那么将此选票加入到选票列表中,如果此张选票通过了“选票有效性验证”,那么将此选票推举的候选人作为leader。
  • 因为leader已存在,将所有选票放在outofelection中,进行一次选票“选票有效性验证”,如果通过就可以将次选票推举的候选人作为leader。

以上两步的差别是在进行有效性校验时,一个用的是recvset,一个用的是outofelection。从代码上看,zookeeper认为只要electionEpoch就认为这是在选举,所以判断选票数目的时候使用的是recvset。

以上两步逻辑比较绕,如果理解起来比较困难,可以参考一下源码。

 

d)   源码

case FOLLOWING:
case LEADING:
if (n.electionEpoch == logicalclock) {
   //如果Notification的electionEpoch和当前的electionEpoch相同,那么说明在同一轮的选举中,
   recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
 
   //判定选举是否结束
    if (ooePredicate(recvset,outofelection, n)){
       // 选举结束,设置状态
       self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
 
       Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
       leaveInstance(endVote);
       return endVote;
    }
}
 
outofelection.put(n.sid,
       new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
// 判定选举是否结束
if(ooePredicate(outofelection, outofelection, n)) {
   synchronized (this) {
       logicalclock = n.electionEpoch;
       self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
    }
   Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
   leaveInstance(endVote);
   return endVote;
}
break;


 

e)   选票有效性验证

  • leader候选人获得超过半数的选票。
  • 通过Leader有效性校验。

 

f)   Leader有效性验证

  • 如果自己不是leader,那么一定要收到过Leader的信息,即收到Leader信息,并且leader的回复信息中宣称自己的状态是ServerState.LEADING
  • 如果自己是leader,那么当前logicalclock一定要等于选票信息中的electionEpoch

 

4.  核心类

以下为选举过程中使用到的核心类。

1)   QuorumPeer

见整理流程部分

2)   FastLeaderElection

默认的选举算法,即上面流程图描述的内容。

此类有几个重要的内部类,如下:

a)   FastLeaderElection#Messenger#WorkerReceiver

从QuorumCnxManager#recvQueue中获取网络包,并将其发到FastLeaderElection#recvqueue中

b)   FastLeaderElection#Messenger#WorkerSender

从FastLeaderElection#sendqueue中获取网络包,并将其放到QuorumCnxManager#queueSendMap中,并发送到网络上

 

3)   QuorumCnxManager

QuorumCnxManager是实际发生网络交互的地方。QuorumCnxManager保证与每一个zookeeper服务器之间只有一个链接。

主要数据结构如下:

a)   queueSendMap

sid(key) -> buffer queue(value),为每个参与投票的server都保留一个队列。

b)   recvQueue

message queue,所有收到的消息都放到recvQueue。

c)   listener

server主线程,收发消息时和上面两个队列交互。

 

四  源码分析

以下是选举过程中用到的主要代码以及注释。代码版本为zookeeper-3.4.10。

我给选举逻辑的部分代码加了注释,以便于理解,下载地址:git@gitee.com:wuzhengfei/zookeeper-3.4.10-sources.git

 

1.  FastLeaderElection

1)   lookForLeader


   public Vote lookForLeader() throws InterruptedException {
       try {
           self.jmxLeaderElectionBean = newLeaderElectionBean();
           MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean,self.jmxLocalPeerBean);
       } catch (Exception e) {
            LOG.warn("Failedto register with JMX", e);
            self.jmxLeaderElectionBean = null;
       }
       if (self.start_fle== 0) {
           self.start_fle = System.currentTimeMillis();
       }
       try {
           // 投票信息列表
           HashMap<Long, Vote> recvset = newHashMap<Long, Vote>();
 
           HashMap<Long, Vote> outofelection = newHashMap<Long, Vote>();
 
           int notTimeout = finalizeWait;
 
           synchronized(this) {
               //更新选举周期
               logicalclock++;
                // 更新提议信息,提议自己作为leader,
               updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());
           }
 
           LOG.info("New election. My id =  " + self.getId() + ",proposed zxid=0x" + Long.toHexString(proposedZxid));
           // 发送选票信息
           sendNotifications();
 
           /*
             * 循环直到选举出leader
             */
           while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
                // 从recvqueue移除第一个Notification(选票信息),最多等待notTimeout ms
               Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
 
                // 如果当前没有收到足够多的Notification,不足以产生leader,那么继续发送选票信息,否则继续处理收到的Notification
               if(n == null){
                   // 当前没有收到回复信息
                   if (manager.haveDelivered()){
                       // 如果消息全部投递出去了,那么在发送一次选票信息
                      sendNotifications();
                   } else {
                       // 如果消息没有投递出去,那么尝试重连
                       manager.connectAll();
                   }
 
                   int tmpTimeOut= notTimeout * 2;
                   notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
                   LOG.info("Notificationtime out: " + notTimeout);
               } elseif (self.getVotingView().containsKey(n.sid)) {
                   /*
                    * Only proceed if the vote comes from a replica in the
                    * voting view.
                    */
                   switch (n.state) {
                   case LOOKING:
                       // If notification > current, replace and sendmessages
                       // out
                       if (n.electionEpoch > logicalclock){
                          logicalclock = n.electionEpoch;
                          // 清空收到的选票信息列表
                          recvset.clear();
                           // 比较notification,current,大的最为下次推荐的leader,然后发送选票信息
                          if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(),
                                  getPeerEpoch())) {
                              updateProposal(n.leader, n.zxid, n.peerEpoch);
                          } else {
                              updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());
                          }
                          sendNotifications();
                       } else if (n.electionEpoch< logicalclock) {
                           // 如果notification#electionEpoch小,认为是无效选票,直接忽略
                          if (LOG.isDebugEnabled()){
                              LOG.debug(
                                       "Notificationelection epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                             + Long.toHexString(n.electionEpoch) + ",logicalclock=0x"
                                             + Long.toHexString(logicalclock));
                          }
                          break;
                       } else if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader,proposedZxid,
                              proposedEpoch)){
                           // 比较notification,current,大的最为下次推荐的leader,然后发送选票信息
                          updateProposal(n.leader, n.zxid, n.peerEpoch);
                          sendNotifications();
                       }
 
                       if (LOG.isDebugEnabled()){
                          LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader
                                  + ",proposed zxid=0x" + Long.toHexString(n.zxid) + ",proposed election epoch=0x"
                                  + Long.toHexString(n.electionEpoch));
                       }
 
                       // 将收到的选票加入到选票列表中
                       recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch,n.peerEpoch));System.err.println("-----"+DateUtil.formatTime(new Date(), DateUtil.PATTERN_TIMESTEMP_SSS)+"    recvset="+JSON.toJSONString(recvset));
                      System.err.println("-----"+DateUtil.formatTime(new Date(), DateUtil.PATTERN_TIMESTEMP_SSS)+"    recvset="+JSON.toJSONString(recvset));
                       
                       // 判断self提议的服务器是否已经获得超过半数的票
                       if (termPredicate(recvset,
                               new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) {
 
                           // 循环直到当recvqueue中没有Notification,或有出现一个Notification的选票信息比推举的leader更大时,方跳出循环
                          while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS))!= null) {
                              if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader,proposedZxid,
                                      proposedEpoch)) {
                                  recvqueue.put(n);
                                  break;
                              }
                          }
 
                          if (n== null) {
                               // 进入这里说明recvqueue中没有比推举leader更大的票,可以将推举的服务器作为leader了。
                              self.setPeerState(
                                      (proposedLeader == self.getId())? ServerState.LEADING : learningState());
 
                              Vote endVote = new Vote(proposedLeader,proposedZxid, logicalclock,proposedEpoch);
                              leaveInstance(endVote);
                              return endVote;
                          }
                       }
                       break;
                   case OBSERVING:
                       LOG.debug("Notificationfrom observer: " + n.sid);
                       break;
                   case FOLLOWING:
                   case LEADING:
                       /**
                        * if内的代码、if外的代码比较类似,唯一的差别是ooePredicate方法的第一个参数,
                        * 考虑集群中只有leader存活,其他所有服务器都处于LOOKING状态,就能理解为什么需要这两段逻辑了。
                        */
                       if (n.electionEpoch == logicalclock){
                           // 如果Notification的electionEpoch和当前的electionEpoch相同,那么说明在同一轮的选举中
                          recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
 
                          // 判定选举是否结束
                          if (ooePredicate(recvset, outofelection,n)) {
                              // 选举结束,设置状态
                              self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
 
                              Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                              leaveInstance(endVote);
                              return endVote;
                          }
                       }
 
                       outofelection.put(n.sid,
                              new Vote(n.version, n.leader, n.zxid, n.electionEpoch,n.peerEpoch,n.state));
                       // 判定选举是否结束
                       if (ooePredicate(outofelection,outofelection, n)){
                          synchronized (this) {
                              logicalclock = n.electionEpoch;
                              self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
                          }
                          Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                          leaveInstance(endVote);
                          return endVote;
                       }
                       break;
                   default:
                       LOG.warn("Notification state unrecognized:{} (n.state), {} (n.sid)", n.state, n.sid);
                       break;
                   }
               } else{
                   LOG.warn("Ignoring notification from non-clustermember " + n.sid);
               }
           }
           return null;
       } finally {
           try {
               if(self.jmxLeaderElectionBean!= null) {
                  MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
               }
           } catch(Exception e) {
                LOG.warn("Failedto unregister with JMX", e);
           }
            self.jmxLeaderElectionBean = null;
           LOG.debug("Number of connection processing threads: {}",manager.getConnectionThreadCount());
       }
   }


 

2)   totalOrderPredicate


   /**
     * 如果new票信息大于当前的票信息,那么返回true。以下三种case,只要满足一种就返回true:
     * <li>newEpoch > curEpoch
     * <li>newEpoch == curEpoch&& newZxid > curZxid
     * <li>newEpoch == curEpoch&& newZxid = curZxid && newId > curId
     */
   protected booleantotalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid,
           long curEpoch) {
       LOG.debug("id: " + newId+ ", proposed id: " + curId + ", zxid:0x" + Long.toHexString(newZxid)
               + ",proposed zxid: 0x" + Long.toHexString(curZxid));
       if (self.getQuorumVerifier().getWeight(newId) == 0) {
           return false;
       }
 
       /*
         * We return true if one of thefollowing three cases hold: 1- New epoch
         * is higher 2- New epoch isthe same as current epoch, but new zxid is
         * higher 3- New epoch is thesame as current epoch, new zxid is the
         * same as current zxid,but server id is higher.
         */
        return ((newEpoch > curEpoch)
               || ((newEpoch== curEpoch) && ((newZxid > curZxid)|| ((newZxid == curZxid)&& (newId > curId)))));
   }

 

3)   termPredicate


   /**
     * 是否终止投票,如果{@linkvote}对应的服务器获得了超过半数的票,那么返回true
     * 
     * @param votes
     * @param vote
     * @return
     */
   protected booleantermPredicate(HashMap<Long, Vote> votes,Vote vote) {
 
       HashSet<Long> set= new HashSet<Long>();
 
       /*
         * First make the viewsconsistent. Sometimes peers will have different
         * zxids for a serverdepending on timing.
         */
       for (Map.Entry<Long,Vote> entry : votes.entrySet()){
           if (vote.equals(entry.getValue())){
               set.add(entry.getKey());
           }
       }
 
       return self.getQuorumVerifier().containsQuorum(set);
   }

 

4)   checkLeader


   /**
     * leader确认检查
     * <li>如果自己不是leader,那么一定要收到过Leader的信息,并且leader宣称自己的状态是ServerState.LEADING;
     * <li>如果自己是leader,那么当前logicalclock一定要等于选票信息中的electionEpoch
     * 
     * @param outofelection
     * @param leader
     * @param electionEpoch
     */
   protected booleancheckLeader(HashMap<Long, Vote> outofelection,long leader,long electionEpoch){
 
       boolean predicate = true;
 
       /*
         * If everyone else thinks I'mthe leader, I must be the leader. The
         * other two checks are justfor the case in which I'm not the leader.
         * If I'm not the leader and Ihaven't received a message from leader
         * stating that it is leading,then predicate is false.
         */
 
       if (leader != self.getId()){
           if (outofelection.get(leader)== null)
                // 表示还未从leader中收到任何信息,所以选举结束未结束。
               predicate= false;
           else if (outofelection.get(leader).getState() != ServerState.LEADING)
                // 指推举的那个leader还没有将自己的状态改成leader,所以选举依然未完成。
               predicate= false;
       } else if (logicalclock!= electionEpoch) {
            // 自己是leader时,这两个值应该是相同的,如果不同说明有异常,不能结束选举。
           predicate= false;
       }
 
       return predicate;
   }


 

5)   ooePredicate

   /**
     * 是否能宣称选举结束
     * <li>{@code n}获得了半数以上的票
     * <li>通过了leader确认的检查
     * 
     * @param recv
     *           map of received votes
     * @param ooe
     *           map containing out of election votes (LEADING or FOLLOWING)
     * @param n
     *           Notification
     * @return
     */
   protected booleanooePredicate(HashMap<Long, Vote> recv,HashMap<Long, Vote> ooe, Notification n) {
 
       return (termPredicate(recv, new Vote(n.version, n.leader, n.zxid, n.electionEpoch,n.peerEpoch,n.state))
               && checkLeader(ooe, n.leader, n.electionEpoch));
 
   }

 

五  选举示例分析

假设zookeeper集群的部署信息如下:

server.1=192.168.1.1:2888:3888

server.2=192.168.1.2:2888:3888

server.3=192.168.1.3:2888:3888

1.  集群启动时

根据上面的信息,由此我们可以得出集群启动时具有如下信息:

Server

Sid/myid

peerEpoch

zxid

Ip

S1

1

0

0

192.168.1.1

S2

2

0

0

192.168.1.2

S3

3

0

0

192.168.1.3

Zookeeper每一台服务器启动时,默认状态为LOOKING状态,所以都会启动选leader流程。当然如果只有一台启动成功,无法满足选举的超过一半的条件,所以永远无法完成选leader逻辑。

 

1)   S1服务器选举过程

为了描述的简单,假设集群只启动S1、S2服务器,我们看一下选举过程是怎样的。

a)   S1发送提议

S1启动后首先就是进行Leader选举。S1首先logicalclock改为1,接着发送选票信息,S1的选票信息如下:

proposedLeader

proposedZxid

proposedEpoch

1

0

0

b)   S2收到S1选举提议

S2启动后就会进入选举流程,因为logicalclock=1,所以忽略同步时钟这个操作;此时S2的提议信息为:

proposedLeader

proposedZxid

proposedEpoch

2

0

0

S2收到信息以后,将消息放在recvqueue中,接着会比较S1的提议和自己的提议信息,此时S2发现self.proposedLeader>S1.proposedLeader,所以S2回复提议信息如下:

proposedLeader

proposedZxid

proposedEpoch

2

0

0

c)   S1收到S2回复的选票信息

S1比较提议后发现自己推举的Leader小,所以将自己的提议信息更新为:

proposedLeader

proposedZxid

proposedEpoch

2

0

0

d)   S2再次收到S1的选票信息

此时S2不用更新提议信息,直接回复S1,回复的提议内容如下:

proposedLeader

proposedZxid

proposedEpoch

2

0

0

e)   S1收到S2的回复的选举提议

此时S1自己的选票信息为:

proposedLeader

proposedZxid

proposedEpoch

2

0

0

收到S2的选票信息为:

proposedLeader

proposedZxid

proposedEpoch

2

0

0

汇总后S1发现,目前收到的选票信息如下:

proposedLeader

proposedZxid

proposedEpoch

2

0

0

2

0

0

因为满足超过半数的条件,所以S1会认为S2是leader,S1退出选举流程。

2)   S1服务器收到的选票信息

a)   S1未收到S2的选票信息

recvset={1:{"electionEpoch":1,"id":1,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}

 

b)   S1收到S2的选票信息

recvset={1:{"electionEpoch":1,"id":1,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0},2:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}

进行到这一步是S1会更新自己的提议信息,改成选举S2作为leader

 

c)   S1收到S2的选票信息

recvset={1:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0},2:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}

进行到这一步就可以认为S2服务器当选成为Leader了。

 

3)   S2的leader选举

S2的选举过程和上面基本一样,唯一不同的地方是:S2一直选举的都是S2服务器,所以不用像S1一样选举过程中修改提议的Leader。

 

 

2.  新服务器加入集群

a)   初始条件假设

继续上面的示例,假设集群开始只启动了S1、S2服务器,运行一段时间以后S3服务器希望加入到集群中。

假设运行一段时间以后集群的情况如下:

Server

Sid/myid

peerEpoch

Zxid(低32位)

Ip

Leader

S1

1

10

900

192.168.1.1

S2

2

10

1000

192.168.1.2

S3新加入集群,加入时的情况如下:

Server

Sid/myid

peerEpoch

Ip

Leader

S3

3

0

192.168.1.3

-

b)   S3发送提议

S3启动后会发起选主流程,发送的提议信息如下:

proposedLeader

proposedZxid

proposedEpoch

3

0

0

c)   S1收到S3的提议信息

因为S3已经是Follower,并且epoch、zxid均大于S3,所以回复的提议如下:

proposedLeader

proposedZxid

proposedEpoch

2

900

10

d)   S2收到S3的提议信息

因为S2已经是Leader,并且epoch、zxid均大于S3,所以回复的提议如下:

proposedLeader

proposedZxid

proposedEpoch

2

1000

10

 

e)   S3收到回复信息

S3统计选票信息会发现S1、S2都推荐S2作为Leader,并且S2已经是leader,因为超过半数推荐S2,所以此时S3也会将S2作为leader,结束选主流程

 

3.  又有新服务器加入集群

继续假设按照上面信息部署集群,又来了一个服务器S4希望加入集群,那么流程会如何呢?

选举过程中存在以下代码:

if (self.getVotingView().containsKey(n.sid)) {

    // 选主逻辑

}

这意味着S1、S2、S3收到S4的选票请求,认为这不是当前集群的服务器,直接忽略选票信息。结论是:S4无法加入集群的。

 

4.  集群运行中重新选举

运行过程中如果Leader失效,或者超过半数服务器与Leader不同步等等情况都可能导致集群运行一段时间后,触发了选主流程。接下来我们以一种最为复杂的方式来介绍选举的完整过程。假设集群信息如下:

Server

Sid/myid

peerEpoch

Zxid(高位_地位)

Ip

S1

1

200

200_900

192.168.1.1

S2

2

100

100_1000

192.168.1.2

S3

3

50

50_1950

192.168.1.3

Zxid的高位是peerEpoch,低位是自增的序列,所以如果peerEpoch大,那么zxid一定大。为了方便描述,zxid我改成“高位_地位”的形式了。

说明:一般而言,正常运行过程是不会出现以上假设的数据的,除非数据被人修改过,或者其他未知原因导致所有服务器的数据均错乱了。

因为集群中没有了leader,所以所有服务器均将重新进入选主流程。接下来我们就以S3服务器为例来讨论选主流程。

a)   第一轮选票

S3发出的选票信息信息为:

proposedLeader

proposedZxid

proposedEpoch

3

50_1950

50

S2收到提议后回复的提议信息如下:

proposedLeader

proposedZxid

proposedEpoch

2

100_1000

100

S1收到提议后回复的提议信息如下:

proposedLeader

proposedZxid

proposedEpoch

1

200_900

200

b)   第一轮选票汇总

假设S3服务器先收到S2的回复,因为S3.proposedEpoch < S2.proposedEpoch,所以S3先更新提议的信息,然后重新发送提议(P1),新的提议如下:

proposedLeader

proposedZxid

proposedEpoch

2

100_1000

100

紧接着接着S3又收到S1的回复,又发现S3.proposedEpoch < S1.proposedEpoch,所以S3再一次更新提议信息,然后发送新的提议(P2),新的提议如下:

proposedLeader

proposedZxid

proposedEpoch

1

200_900

200

因为提议P2最终会覆盖提议P1,所以我们直接忽略S1、S2对P1的回复。直接讨论他们对P2的回复。

S1收到选票的提议以后,回复以下提议信息:

proposedLeader

proposedZxid

proposedEpoch

1

200_900

200

S2收到选票的提议以后,回复以下提议信息:

proposedLeader

proposedZxid

proposedEpoch

1

200_900

200

 

c)   第二轮选票汇总

S3收到的提议信息如下:

proposedLeader

proposedZxid

proposedEpoch

1

200_900

200

1

200_900

200

1

200_900

200

S3汇总以后得出的结论是S1成为Leader,然后退出选举。

 

 

 

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
19天前
|
消息中间件 算法 网络协议
选举机制理解描述
选举机制理解描述
24 1
选举机制理解描述
|
2月前
|
存储 移动开发 算法
Quorum NWR:通过仲裁实现数据一致性
Quorum NWR:通过仲裁实现数据一致性
64 11
|
4月前
|
消息中间件 Kafka 程序员
Kafka内幕:详解Leader选举与副本同步的那些事儿
大家好,我是小米,今天给大家带来一篇关于 Kafka 核心机制的深度解析文章。本文将详细讲解 Kafka 的 Leader 选举、副本消息同步以及相关概念 LEO 和 HW,帮助大家更好地理解和应用 Kafka,提升处理分布式系统的能力。快来一起学习吧!
509 0
|
存储 算法 Java
深入理解 ZK集群的Leader选举(一)
深入理解 ZK集群的Leader选举(一)
353 0
|
监控 NoSQL 算法
从哨兵Leader选举学习Raft协议实现(上)
从哨兵Leader选举学习Raft协议实现(上)
97 0
|
NoSQL Redis Sentinel
从哨兵Leader选举学习Raft协议实现(下)(二)
从哨兵Leader选举学习Raft协议实现(下)
54 0
|
Sentinel
从哨兵Leader选举学习Raft协议实现(下)(一)
从哨兵Leader选举学习Raft协议实现(下)
55 0
Zookeeper Leader选举机制
Zookeeper Leader选举机制
82 0
|
算法
实现分布式 kv—2 raft leader 选举
raft 是一个分布式一致性算法,主要保证的是在分布式系统中,各个节点的数据一致性。raft 算法比较复杂,因为它所解决的分布式一致性问题本来就是一个比较棘手的问题,raft 算法的实现主要可以拆解为三个部分: • 领导选举 • 日志复制 • 安全性
133 2
|
网络协议 数据库
Zookeeper ZAB 一致性协议
Zookeeper 是通过 ZAB 一致性协议来实现分布式事务的最终一致性。
158 1
Zookeeper ZAB 一致性协议