之前在网上看了很多zookeeper的zab原理,但讲述的都不够详细,很多地方模糊不清,所以就研究了一下zookeeper源码,并整理成几篇博客,希望对其他小伙伴有所帮助。本文是ZAB协议崩溃恢复Leader选举部分的内容,数据同步见另一篇博客 《ZAB协议恢复模式-数据同步》
。
为了避免理解上的歧义,将投票动作和投票信息区分开,在本文中,我将服务器的投票信息称之为选票。
一 基本概念
1. Noitifcation
Notification其实是选举过程中的通信信息;选举过程主要围绕Notification进行。
选Leader过程中Zookeeper server(QuorumPeer)都会根据Notification信息生成Vote(选票信息)。为了方便以下理解,我们不妨将Notification看成每个zookeeper server的选票信息。
以下是notification的主要字段。
1) zxid
事务id,事务请求的唯一标记,由leader服务器负责进行分配。
高32位是peerEpoch,低32位是请求的计数,从0开始。
2) peerEpoch
每次leader选举完成之后,都会选举出一个新的peerEpoch,用来标记事务请求所属的轮次。
3) electionEpoch
每次leader选举,electionEpoch就会自增1,统计选票信息时,首先保证electionEpoch相同。
4) sid
服务器id
5) leader
提议的leader。
2. 其他概念
1) lastProcessedZxid
最后一次commit的事务请求的zxid
二 Zookeeper Server
1. QuorumPeer
Zookeeper server启动始于QuorumPeerMain#main()。
Zookeeper server的主要逻辑都在QuorumPeer中,此类具体负责的逻辑如下流程图:
本文仅介绍Leader选举流程内容,其他流程(follower流程、leader流程、observer流程)见《ZAB协议恢复模式-数据同步》。
从流程图中可以看到,QuorumPeer将一直运行,直到running=false。While无限循环中,根据当前zookeeper服务器的投票状态进入不同的业务逻辑。
服务器启动时处于LOOKING状态;退出任何子流程以后状态立即被改成LOOKING状态。LOOKING状态,表示Zookeeper服务端在进行选举流程。
在集群环境下,任何一台服务器都可能被选中成为leader,但每台服务器成为leader的可能性会有所不同,具体为:zxid、peerEpoch、electionEpoch、sid大者更容易被选举为leader,选举流程部分会详细讲述此中缘由。
2. 状态
1) LOOKING
表示在选举leader。
2) FOLLOWING
服务器的角色为follower。
3) LEADING
服务器的角色为leader。
4) OBSERVING
服务器的角色为observer,此种服务器不参与投票,只是同步状态。
三 Leader选举
1. 选举须知
选举流程比较复杂,在正式进入选举流程之前,需要先弄清楚以下内容:
每个Zookeeper服务端进入LOOKING状态以后,都会发起选举流程,默认情况下是快速选举,所以由FastLeaderElection#lookForLeader方法承担此职责。
每个Zookeeper服务器接收到选票提议以后,只有两个选择:
- 接受选票提议,认可提议中推荐的服务器作为Leader候选人;
- 不接受选票提议,推荐自己上一次推荐的服务器作为Leader候选人。(选举开始是总是推荐自己作为候选人,选举中会根据收到的选票信息决定是否更换推荐候选人)
默认情况下,至少超过半数(即n/2+1)服务器投票给同一个Leader候选人时,Leader候选人才有可能被选中为Leader。(这里说的是有可能,还需要进行一些其他逻辑进行验证)。
2. 流程
以上为代码的完整流程,看起来比较复杂,我们可以按照以下内容简单理解一下。
需要说明的是:此流程结束仅仅是确认那个服务器成为Leader,具体Leader是否能够最终成为Leader,还有另外的流程决定,这部分内容会见《ZAB协议恢复模式-数据同步》。
3. 流程详述
流程比较复杂,接下来对流程图中标有数字的地方详细介绍。
1) 1-自增logicalclock
Logicalclock就是Notification中的electionEpoch.
选举的第一个操作是logicalclock自增,接着更新提议信息,其实第一次总是提议自己作为Leader。
如果和现实中总统选举做一个类比的话,每次总统选举时都要明确这是第几届选举,logicalclock就对应的是“第几届”。整个选举必须保证处于同一届选举中方有效。
2) 2-发送选票信息
这是一个异步操作(由sendNotifications封装),将提议信息放到FastLeaderElection#sendqueue队列中,然后异步的发送个所有其他zookeeper server(这里指的是所有参与投票的服务器,不会发送个Observer类型的服务器)。
3) 3-从选票队列中取选票信息
当前server收到其他服务器的选举回复信息以后,将选票信息放在FastLeaderElection#recvqueue,当服务器循环从此队列中取选票信息。
如果队列中有选票信息立即返回,如果没有则等待。这里有一个超时时间,如果超过此时间依然没有选票信息,则返回null,这么做可以防止死等。
4) 4-判断消息是否发送出去
当从recvqueue没有取得选票信息时,会检查是否已经将提议的leader发送给其他server了,如果queueSendMap(待发送队列)为空,说明已经全部发送出去了;否则认为没有发送出去,此时会重连其他zookeeper server,保证链路畅通。
5) 5-重连其他ZookeeperServer
如果链路出现异常,可能会导致提议信息无法发送成功,所以如果queueSendMap中的信息没有全部发送出去,此时会重连其他zookeeperserver,以保证zookeeper集群的链路畅通。
6) 6-LOOKING状态时,electionEpoch比较
如果收到的选票信息状态为LOOKING,说明对方也在选举中。
a) electionEpoch比较
进行electionEpoch比较的目的是统一当前是第几届选举。
- 如果收到选票的electionEpoch更大,那么使用收到选票的electionEpoch作为“届”,然后清空收到的选票信息,更新提议信息(这里有一个判断过程),重新发送更新后的提议。
- 如果收到选票的electionEpoch小,直接忽略此选票。
- 如果收到选票的electionEpoch和当前相同,那么认为是合法的选票,接着判断是否应该更新选票。
b) 接受选票的提议
当且仅当以下三个条件满足其一时,将接受选票的提议,并重新发送选票信息。
- n.peerEpoch > self.proposedEpoch
- n.peerEpoch == self.proposedEpoch&& n.zxid > self.proposedZxid
- n.peerEpoch == self.proposedEpoch&& n.zxid = self.proposedZxid && n.leader > self.proposedLeader
n指的是收到的选票,self指的是当前服务器自身的提议
由此可知:peerEpoch、zxid、leader越大,与有可能成为Leader。
proposedLeader开始的时候一定是当前server的id,但随着选举的进行,会变成上一次提议的leader。
7) 7-Leader是否有效
如果某一个server已经得到半数以上的选票,那么进入leader是否有效的验证逻辑,具体如下:
无限循环的从recvqueue中取选票,满足一下条件之一时退出循环:
- recvqueue没有选举票(超时时间内一直没有获取到选票);
- 取到一个更新的选票信息(满足“接受选票提议”的条件,则说明提议更新)。
这里其实是一个Leader有效性的校验。依次从recvqueue中取出所有的选票,校验发现所有的选票均满足“接受选票提议”时,说明没有服务器的选票能够推翻之前的结论,所以此时可以认为Leader是有效的。
8) 8-FOLLOWING、LEADING状态时,electionEpoch比较
a) 选票集合
为了将此部分解释清楚,需要先能清楚选举过程中用到的两个集合:
recvset:用来记录选票信息,以方便后续统计;
outofelection:用来记录选举逻辑之外的选票,例如当一个服务器加入zookeeper集群时,因为集群已经存在,不用重新选举,只需要在满足一定条件下加入集群即可。
b) electionEpoch比较
如果收到的选票显示处于FOLLOWING、LEADING状态,说明集群目前有Leader,只需要确保当前服务器和Leader能够正常通信,并收到了集群半数以上服务器推荐推荐此Leader时,就直接加入到集群中去。
因为Leader已经存在,所以所有的选票都会加入到outofelection中。如果outofelection有一条选票是来自leader的,那么就可以认为自己和leader正常通信;如果outofelection中统计出有超过半数的服务器都推荐了这个leader,那么毫无疑问,此选票推荐的就是我们的leader。
c) 源码逻辑
- 如果logicalclock = n.electionEpoch相同,那么将此选票加入到选票列表中,如果此张选票通过了“选票有效性验证”,那么将此选票推举的候选人作为leader。
- 因为leader已存在,将所有选票放在outofelection中,进行一次选票“选票有效性验证”,如果通过就可以将次选票推举的候选人作为leader。
以上两步的差别是在进行有效性校验时,一个用的是recvset,一个用的是outofelection。从代码上看,zookeeper认为只要electionEpoch就认为这是在选举,所以判断选票数目的时候使用的是recvset。
以上两步逻辑比较绕,如果理解起来比较困难,可以参考一下源码。
d) 源码
case FOLLOWING:
case LEADING:
if (n.electionEpoch == logicalclock) {
//如果Notification的electionEpoch和当前的electionEpoch相同,那么说明在同一轮的选举中,
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判定选举是否结束
if (ooePredicate(recvset,outofelection, n)){
// 选举结束,设置状态
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
outofelection.put(n.sid,
new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
// 判定选举是否结束
if(ooePredicate(outofelection, outofelection, n)) {
synchronized (this) {
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
e) 选票有效性验证
- leader候选人获得超过半数的选票。
- 通过Leader有效性校验。
f) Leader有效性验证
- 如果自己不是leader,那么一定要收到过Leader的信息,即收到Leader信息,并且leader的回复信息中宣称自己的状态是ServerState.LEADING
- 如果自己是leader,那么当前logicalclock一定要等于选票信息中的electionEpoch
4. 核心类
以下为选举过程中使用到的核心类。
1) QuorumPeer
见整理流程部分
2) FastLeaderElection
默认的选举算法,即上面流程图描述的内容。
此类有几个重要的内部类,如下:
a) FastLeaderElection#Messenger#WorkerReceiver
从QuorumCnxManager#recvQueue中获取网络包,并将其发到FastLeaderElection#recvqueue中
b) FastLeaderElection#Messenger#WorkerSender
从FastLeaderElection#sendqueue中获取网络包,并将其放到QuorumCnxManager#queueSendMap中,并发送到网络上
3) QuorumCnxManager
QuorumCnxManager是实际发生网络交互的地方。QuorumCnxManager保证与每一个zookeeper服务器之间只有一个链接。
主要数据结构如下:
a) queueSendMap
sid(key) -> buffer queue(value),为每个参与投票的server都保留一个队列。
b) recvQueue
message queue,所有收到的消息都放到recvQueue。
c) listener
server主线程,收发消息时和上面两个队列交互。
四 源码分析
以下是选举过程中用到的主要代码以及注释。代码版本为zookeeper-3.4.10。
我给选举逻辑的部分代码加了注释,以便于理解,下载地址:git@gitee.com:wuzhengfei/zookeeper-3.4.10-sources.git
1. FastLeaderElection
1) lookForLeader
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = newLeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean,self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failedto register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle== 0) {
self.start_fle = System.currentTimeMillis();
}
try {
// 投票信息列表
HashMap<Long, Vote> recvset = newHashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = newHashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this) {
//更新选举周期
logicalclock++;
// 更新提议信息,提议自己作为leader,
updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() + ",proposed zxid=0x" + Long.toHexString(proposedZxid));
// 发送选票信息
sendNotifications();
/*
* 循环直到选举出leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
// 从recvqueue移除第一个Notification(选票信息),最多等待notTimeout ms
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
// 如果当前没有收到足够多的Notification,不足以产生leader,那么继续发送选票信息,否则继续处理收到的Notification
if(n == null){
// 当前没有收到回复信息
if (manager.haveDelivered()){
// 如果消息全部投递出去了,那么在发送一次选票信息
sendNotifications();
} else {
// 如果消息没有投递出去,那么尝试重连
manager.connectAll();
}
int tmpTimeOut= notTimeout * 2;
notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
LOG.info("Notificationtime out: " + notTimeout);
} elseif (self.getVotingView().containsKey(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and sendmessages
// out
if (n.electionEpoch > logicalclock){
logicalclock = n.electionEpoch;
// 清空收到的选票信息列表
recvset.clear();
// 比较notification,current,大的最为下次推荐的leader,然后发送选票信息
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(),
getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch< logicalclock) {
// 如果notification#electionEpoch小,认为是无效选票,直接忽略
if (LOG.isDebugEnabled()){
LOG.debug(
"Notificationelection epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch) + ",logicalclock=0x"
+ Long.toHexString(logicalclock));
}
break;
} else if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader,proposedZxid,
proposedEpoch)){
// 比较notification,current,大的最为下次推荐的leader,然后发送选票信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if (LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader
+ ",proposed zxid=0x" + Long.toHexString(n.zxid) + ",proposed election epoch=0x"
+ Long.toHexString(n.electionEpoch));
}
// 将收到的选票加入到选票列表中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch,n.peerEpoch));System.err.println("-----"+DateUtil.formatTime(new Date(), DateUtil.PATTERN_TIMESTEMP_SSS)+" recvset="+JSON.toJSONString(recvset));
System.err.println("-----"+DateUtil.formatTime(new Date(), DateUtil.PATTERN_TIMESTEMP_SSS)+" recvset="+JSON.toJSONString(recvset));
// 判断self提议的服务器是否已经获得超过半数的票
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) {
// 循环直到当recvqueue中没有Notification,或有出现一个Notification的选票信息比推举的leader更大时,方跳出循环
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS))!= null) {
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader,proposedZxid,
proposedEpoch)) {
recvqueue.put(n);
break;
}
}
if (n== null) {
// 进入这里说明recvqueue中没有比推举leader更大的票,可以将推举的服务器作为leader了。
self.setPeerState(
(proposedLeader == self.getId())? ServerState.LEADING : learningState());
Vote endVote = new Vote(proposedLeader,proposedZxid, logicalclock,proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notificationfrom observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/**
* if内的代码、if外的代码比较类似,唯一的差别是ooePredicate方法的第一个参数,
* 考虑集群中只有leader存活,其他所有服务器都处于LOOKING状态,就能理解为什么需要这两段逻辑了。
*/
if (n.electionEpoch == logicalclock){
// 如果Notification的electionEpoch和当前的electionEpoch相同,那么说明在同一轮的选举中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 判定选举是否结束
if (ooePredicate(recvset, outofelection,n)) {
// 选举结束,设置状态
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
outofelection.put(n.sid,
new Vote(n.version, n.leader, n.zxid, n.electionEpoch,n.peerEpoch,n.state));
// 判定选举是否结束
if (ooePredicate(outofelection,outofelection, n)){
synchronized (this) {
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized:{} (n.state), {} (n.sid)", n.state, n.sid);
break;
}
} else{
LOG.warn("Ignoring notification from non-clustermember " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean!= null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch(Exception e) {
LOG.warn("Failedto unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",manager.getConnectionThreadCount());
}
}
2) totalOrderPredicate
/**
* 如果new票信息大于当前的票信息,那么返回true。以下三种case,只要满足一种就返回true:
* <li>newEpoch > curEpoch
* <li>newEpoch == curEpoch&& newZxid > curZxid
* <li>newEpoch == curEpoch&& newZxid = curZxid && newId > curId
*/
protected booleantotalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid,
long curEpoch) {
LOG.debug("id: " + newId+ ", proposed id: " + curId + ", zxid:0x" + Long.toHexString(newZxid)
+ ",proposed zxid: 0x" + Long.toHexString(curZxid));
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
/*
* We return true if one of thefollowing three cases hold: 1- New epoch
* is higher 2- New epoch isthe same as current epoch, but new zxid is
* higher 3- New epoch is thesame as current epoch, new zxid is the
* same as current zxid,but server id is higher.
*/
return ((newEpoch > curEpoch)
|| ((newEpoch== curEpoch) && ((newZxid > curZxid)|| ((newZxid == curZxid)&& (newId > curId)))));
}
3) termPredicate
/**
* 是否终止投票,如果{@linkvote}对应的服务器获得了超过半数的票,那么返回true
*
* @param votes
* @param vote
* @return
*/
protected booleantermPredicate(HashMap<Long, Vote> votes,Vote vote) {
HashSet<Long> set= new HashSet<Long>();
/*
* First make the viewsconsistent. Sometimes peers will have different
* zxids for a serverdepending on timing.
*/
for (Map.Entry<Long,Vote> entry : votes.entrySet()){
if (vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);
}
4) checkLeader
/**
* leader确认检查
* <li>如果自己不是leader,那么一定要收到过Leader的信息,并且leader宣称自己的状态是ServerState.LEADING;
* <li>如果自己是leader,那么当前logicalclock一定要等于选票信息中的electionEpoch
*
* @param outofelection
* @param leader
* @param electionEpoch
*/
protected booleancheckLeader(HashMap<Long, Vote> outofelection,long leader,long electionEpoch){
boolean predicate = true;
/*
* If everyone else thinks I'mthe leader, I must be the leader. The
* other two checks are justfor the case in which I'm not the leader.
* If I'm not the leader and Ihaven't received a message from leader
* stating that it is leading,then predicate is false.
*/
if (leader != self.getId()){
if (outofelection.get(leader)== null)
// 表示还未从leader中收到任何信息,所以选举结束未结束。
predicate= false;
else if (outofelection.get(leader).getState() != ServerState.LEADING)
// 指推举的那个leader还没有将自己的状态改成leader,所以选举依然未完成。
predicate= false;
} else if (logicalclock!= electionEpoch) {
// 自己是leader时,这两个值应该是相同的,如果不同说明有异常,不能结束选举。
predicate= false;
}
return predicate;
}
5) ooePredicate
/**
* 是否能宣称选举结束
* <li>{@code n}获得了半数以上的票
* <li>通过了leader确认的检查
*
* @param recv
* map of received votes
* @param ooe
* map containing out of election votes (LEADING or FOLLOWING)
* @param n
* Notification
* @return
*/
protected booleanooePredicate(HashMap<Long, Vote> recv,HashMap<Long, Vote> ooe, Notification n) {
return (termPredicate(recv, new Vote(n.version, n.leader, n.zxid, n.electionEpoch,n.peerEpoch,n.state))
&& checkLeader(ooe, n.leader, n.electionEpoch));
}
五 选举示例分析
假设zookeeper集群的部署信息如下:
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888
1. 集群启动时
根据上面的信息,由此我们可以得出集群启动时具有如下信息:
Server |
Sid/myid |
peerEpoch |
zxid |
Ip |
S1 |
1 |
0 |
0 |
192.168.1.1 |
S2 |
2 |
0 |
0 |
192.168.1.2 |
S3 |
3 |
0 |
0 |
192.168.1.3 |
Zookeeper每一台服务器启动时,默认状态为LOOKING状态,所以都会启动选leader流程。当然如果只有一台启动成功,无法满足选举的超过一半的条件,所以永远无法完成选leader逻辑。
1) S1服务器选举过程
为了描述的简单,假设集群只启动S1、S2服务器,我们看一下选举过程是怎样的。
a) S1发送提议
S1启动后首先就是进行Leader选举。S1首先logicalclock改为1,接着发送选票信息,S1的选票信息如下:
proposedLeader |
proposedZxid |
proposedEpoch |
1 |
0 |
0 |
b) S2收到S1选举提议
S2启动后就会进入选举流程,因为logicalclock=1,所以忽略同步时钟这个操作;此时S2的提议信息为:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
0 |
0 |
S2收到信息以后,将消息放在recvqueue中,接着会比较S1的提议和自己的提议信息,此时S2发现self.proposedLeader>S1.proposedLeader,所以S2回复提议信息如下:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
0 |
0 |
c) S1收到S2回复的选票信息
S1比较提议后发现自己推举的Leader小,所以将自己的提议信息更新为:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
0 |
0 |
d) S2再次收到S1的选票信息
此时S2不用更新提议信息,直接回复S1,回复的提议内容如下:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
0 |
0 |
e) S1收到S2的回复的选举提议
此时S1自己的选票信息为:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
0 |
0 |
收到S2的选票信息为:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
0 |
0 |
汇总后S1发现,目前收到的选票信息如下:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
0 |
0 |
2 |
0 |
0 |
因为满足超过半数的条件,所以S1会认为S2是leader,S1退出选举流程。
2) S1服务器收到的选票信息
a) S1未收到S2的选票信息
recvset={1:{"electionEpoch":1,"id":1,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}
b) S1收到S2的选票信息
recvset={1:{"electionEpoch":1,"id":1,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0},2:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}
进行到这一步是S1会更新自己的提议信息,改成选举S2作为leader
c) S1收到S2的选票信息
recvset={1:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0},2:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}
进行到这一步就可以认为S2服务器当选成为Leader了。
3) S2的leader选举
S2的选举过程和上面基本一样,唯一不同的地方是:S2一直选举的都是S2服务器,所以不用像S1一样选举过程中修改提议的Leader。
2. 新服务器加入集群
a) 初始条件假设
继续上面的示例,假设集群开始只启动了S1、S2服务器,运行一段时间以后S3服务器希望加入到集群中。
假设运行一段时间以后集群的情况如下:
Server |
Sid/myid |
peerEpoch |
Zxid(低32位) |
Ip |
Leader |
S1 |
1 |
10 |
900 |
192.168.1.1 |
否 |
S2 |
2 |
10 |
1000 |
192.168.1.2 |
是 |
S3新加入集群,加入时的情况如下:
Server |
Sid/myid |
peerEpoch |
Ip |
Leader |
S3 |
3 |
0 |
192.168.1.3 |
- |
b) S3发送提议
S3启动后会发起选主流程,发送的提议信息如下:
proposedLeader |
proposedZxid |
proposedEpoch |
3 |
0 |
0 |
c) S1收到S3的提议信息
因为S3已经是Follower,并且epoch、zxid均大于S3,所以回复的提议如下:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
900 |
10 |
d) S2收到S3的提议信息
因为S2已经是Leader,并且epoch、zxid均大于S3,所以回复的提议如下:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
1000 |
10 |
e) S3收到回复信息
S3统计选票信息会发现S1、S2都推荐S2作为Leader,并且S2已经是leader,因为超过半数推荐S2,所以此时S3也会将S2作为leader,结束选主流程
3. 又有新服务器加入集群
继续假设按照上面信息部署集群,又来了一个服务器S4希望加入集群,那么流程会如何呢?
选举过程中存在以下代码:
if (self.getVotingView().containsKey(n.sid)) {
// 选主逻辑
}
这意味着S1、S2、S3收到S4的选票请求,认为这不是当前集群的服务器,直接忽略选票信息。结论是:S4无法加入集群的。
4. 集群运行中重新选举
运行过程中如果Leader失效,或者超过半数服务器与Leader不同步等等情况都可能导致集群运行一段时间后,触发了选主流程。接下来我们以一种最为复杂的方式来介绍选举的完整过程。假设集群信息如下:
Server |
Sid/myid |
peerEpoch |
Zxid(高位_地位) |
Ip |
S1 |
1 |
200 |
200_900 |
192.168.1.1 |
S2 |
2 |
100 |
100_1000 |
192.168.1.2 |
S3 |
3 |
50 |
50_1950 |
192.168.1.3 |
Zxid的高位是peerEpoch,低位是自增的序列,所以如果peerEpoch大,那么zxid一定大。为了方便描述,zxid我改成“高位_地位”的形式了。
说明:一般而言,正常运行过程是不会出现以上假设的数据的,除非数据被人修改过,或者其他未知原因导致所有服务器的数据均错乱了。
因为集群中没有了leader,所以所有服务器均将重新进入选主流程。接下来我们就以S3服务器为例来讨论选主流程。
a) 第一轮选票
S3发出的选票信息信息为:
proposedLeader |
proposedZxid |
proposedEpoch |
3 |
50_1950 |
50 |
S2收到提议后回复的提议信息如下:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
100_1000 |
100 |
S1收到提议后回复的提议信息如下:
proposedLeader |
proposedZxid |
proposedEpoch |
1 |
200_900 |
200 |
b) 第一轮选票汇总
假设S3服务器先收到S2的回复,因为S3.proposedEpoch < S2.proposedEpoch,所以S3先更新提议的信息,然后重新发送提议(P1),新的提议如下:
proposedLeader |
proposedZxid |
proposedEpoch |
2 |
100_1000 |
100 |
紧接着接着S3又收到S1的回复,又发现S3.proposedEpoch < S1.proposedEpoch,所以S3再一次更新提议信息,然后发送新的提议(P2),新的提议如下:
proposedLeader |
proposedZxid |
proposedEpoch |
1 |
200_900 |
200 |
因为提议P2最终会覆盖提议P1,所以我们直接忽略S1、S2对P1的回复。直接讨论他们对P2的回复。
S1收到选票的提议以后,回复以下提议信息:
proposedLeader |
proposedZxid |
proposedEpoch |
1 |
200_900 |
200 |
S2收到选票的提议以后,回复以下提议信息:
proposedLeader |
proposedZxid |
proposedEpoch |
1 |
200_900 |
200 |
c) 第二轮选票汇总
S3收到的提议信息如下:
proposedLeader |
proposedZxid |
proposedEpoch |
1 |
200_900 |
200 |
1 |
200_900 |
200 |
1 |
200_900 |
200 |
S3汇总以后得出的结论是S1成为Leader,然后退出选举。