上一篇博客中,我们详细讨论了Zookeeper的Leader选举过程,接下来我们讨论一下Leader选举以后的事情,并了解zookeeper的集群管理原理。
提前说明:
- 本文主题虽然是讲述崩溃恢复模式,不过也会对广播模式的内容进行简单的描述。
- 为了在文中描述不至于太过啰嗦,所以对超过半数省略掉了一个限定返回。例如当出现类似于“超过半数follower与leader同步”,“收到超过半数follower的回复”这种描述时,这种描述不正确,因为这个半数计算的时候是包含leader的。即如果文中讲述“超过半数”其实指的是leader、follower在一起,超过半数,或者所有服务器(不包括observer)超过半数。另外,Leader一定与自己是同步的。
本文主要讨论内容如下:
- lead流程。
- followLeader流程。
- 主从数据如何同步。
- 如何保证集群数据一致性。
- 异常场景分析。
一 主从同步简化流程
1. 模型
目前在网上看到的主从不同原理基本上都是这样的:follower连接leader,将lastzxid告诉leader;leader根据last zxid确定同步点,然后将已经提交的协议发送给follower;follower同步完成后同通知leader。
2. 分析
此模型可以说它正确,也可以说它不正确,说它对是因为大体上确实是如此的;但如果剖根问底,同步的每一步具体是怎样的,这个图就显得不那么正确了。Zookeeper主从同步过程中有非常多消息,每种消息分别处理不同业务,并非上面简单的三步。
如果你只想知道个大概,那么就按照这个图理解就可以了,如果想了解更加详细的内容,搞清楚具体原理请继续阅读后续内容。
3. 问题
因为这个模型太过于简单,它无法回答以下几个问题:
- 如果follower的zxid大于leader的zxid将会如何?
这种情况是可能存在的,例如:之前leader是S1服务器,leader宕机后过了一段时间才重新加入到集群中来,加进来是已经选好了新的leader(例如S2服务器),而此时S2的zxid小于S1,此时会如何?
- Follower连不上leader,或者连接上了以后leader一直没有反应,集群将会如何?
例如:集群选举时leader正常,选举完成以后,在数据同步完成之前或数据同步过程中leader不可用了,那么集群将会如何?
- 数据同步时,如果有部分Follower同步的很慢,集群一直无法完成崩溃恢复这一过程会如何?
- Zookeeper集群是如何保证follower和leader同步以后才能接受请求的?
- Leader、follower的通信时有哪些类型,如何保证消息不错乱,通信过程安全可靠的?
接下来会从两个角度分析ZAB协议恢复模式中数据同步逻辑。
- 从leader、follower流程上分析。
- 从leader、follwer数据交互上分析。
二 流程分析
此部分从leader、follower的流程初步探讨zookeeper集群工作原理,因为存在大量的主从交互,所以从流程上不一定很好理解,所以这一部分从整体上介绍一下工作原理,更加详细的部分放在“主从交互详解”中介绍。
1. Leader
1) 流程图
a) leader主流程
因为上图中的多个子流程基本上都是在LearnerHandler中实现的,所以我将这几个这里其他的几个子流程放在LearnerHandler中描述。
2) 分析
a) 流程分析
- Leader每接受一个follower的链接,均会创建一个LearnerHandler对象,以后leader与此follower的交互均在LearnerHandler类中完成。
- 接着就是确定新的epoch,必须从半数以上follower处获取epoch信息以后,才能生成新的epoch,即这里会会导致leader等待。见Leader#getEpochToPropose方法。
- 然后是通知所有follower新的epoch,这个过程需要确保集群半数以上服务器达成一致,所以会等待。见leader#waitForEpochAck方法。
- 然后等待集群完成崩溃恢复中数据同步逻辑,当集群中超过半数以上服务器发送ACK信息给leader以后,leader认为同步完成,恢复模式结束,接着进入广播模式。见Leader#waitForNewLeaderAck方法。
- ZAB协议中恢复模式完成以后,集群进入广播模式,与之对应的是Leader会进入一个无限循环之中,在此循环中每隔一段时间统计一次同步的follower信息,并校验是否还有超过半数的follower与leader同步,如果少于一半,则退出lead流程。
b) 广播模式示例代码
boolean tickSkip = true;
while (true) {
//休眠
Thread.sleep(self.tickTime / 2);
if (!tickSkip){
self.tick++;
}
HashSet<Long> syncedSet = new HashSet<Long>();
//lock on the followers when we use it.
syncedSet.add(self.getId());
for (LearnerHandler f :getLearners()) {
// Synced setis used to check we have a supporting quorum,
// so only
// PARTICIPANT, notOBSERVER, learners should be used
if (f.synced() && f.getLearnerType()==LearnerType.PARTICIPANT) {
//更新同步集合列表
syncedSet.add(f.getSid());
}
//心跳维持
f.ping();
}
//check leader running status
if (!this.isRunning()){
shutdown("Unexpectedinternal error");
return;
}
if (!tickSkip&&!self.getQuorumVerifier().containsQuorum(syncedSet)) {
//超过半数以上服务器与leader不同步时,退出lead流程
// Lost quorum,shutdown
shutdown("Not sufficientfollowers synced, only synced with sids: [ " + getSidSetString(syncedSet)
+ "]");
// make sure the orderis the same!
// the leader goes tolooking
return;
}
tickSkip = !tickSkip;
}
2. LearnerHandler
1) 流程图
a) LearnerHandler主流程
b) 计算集群epoch
c) 集群更新epoch
d) 集群数据同步
e) 广播模式
2) 分析
a) 流程分析
LearnerHandler是leader中最为核心的一个类,他和Leader一起合作保证集群正常稳定的运行。LearnerHandler继承自ZooKeeperThread,由此可以看出它是一个线程,此线程负责和此follower的所有通信,具体来说需要负责以下几件事情:
- 取follower得epoch,以便于leader计算出集群新的epoch。此过程需要确保超过半数以上follower告知leader他们的acceptedEpoch。
- 通知集群服务器新的epoch。
- Follower收到epoch后会回复一个消息(条件是protocolVersion>=0x10000,不满足此条件的流程略有差别),等待leader收到超过半数的follower回复此消息。此过程也是需要等待,确保整个集群半数以上服务器epoch已经一致了。
- 根据follower的last zxid决定同步的起点,然后将需要同步的数据加入到一个队列中,新启动一个线程将这些数据发送给follower。此过程需要等待半数以上服务器完成与leader的同步。此过程结束则集群的崩溃恢复过程技术。
- 将进入无限循环,读取数据包、处理数据包,广播给follower,即进入广播模式。
3. Follower
1) 流程图
a) Follwer主流程
b) Epoch同步流程
c) 主从数据同步流程
d) 广播模式
2) 分析
Follower的逻辑较为简单,总体来说负责以下内容:
- 告知leader自己的acceptedEpoch,以便于leader计算出集群新的epoch。
- 接收leader新epoch信息,以便于和集群保持一致的epoch。
- 同步之前已经committed的数据,以便于和leader同步。
- 通知leader自己已经同步完成,以便于集群退出崩溃恢复模式。
- 数据同步完成以后,将进入无限循环,读取数据包、处理数据包,即进入广播模式,处理提议信息。
三 主从交互详解
接下来从主从交互的角度上详细讨论一下zookeeper工作原理。
1. 主从交互图
说明:
以上模型是根据zookeeper-3.4.10画出来的。
在zookeeper不同版本中,逻辑会稍有不同,不过从整体上而言基本相同,数据同步逻辑中这种区别具体体现在:Leader端会根据follower传输过来的protocolVersion,做一些特殊处理。例如: protocolVersion<0x10000时,不会在等待ack前发送LEADERINFO消息;在上面流程如第7步中,如果protocolVersion<0x10000,会先写一个NEWLEADER消息到follower等。
Observer的流程和follower类似,主要区别在于:不参与确认集群状态,不参与提议处理。
2. 模型详细分析
以下重点对模型中标注数字的模块进行详细说明,阅读时请参考上面模型图。
1) 1-从磁盘中加载数据
加载磁盘数据,获取lastProcessZxid用于后续生成newEpoch。每次leader诞生以后,都需要升级epoch。
2) 2-保证有超过半数的follower与leader建立连接
a) 创建LearnerHandler
leader生成的条件时需要半数以上follower选此服务器;选出leader后,需要保证有超过半数的follower能和leader建立通信。
Leader会创建LearnerCnxAcceptor线程,专门负责接收follower的连接,将leader与follower建立连接以后,则创建一个LearnerHandler线程。
LearnerCnxAcceptor#run示例代码如下:
while (!stop){
try {
Socket s= ss.accept();
// start with theinitLimit, once the ack isprocessed
// in LearnerHandlerswitch to the syncLimit
s.setSoTimeout(self.tickTime *self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is= new BufferedInputStream(s.getInputStream());
// 建立处理learner请求的处理器
LearnerHandler fh= new LearnerHandler(s, is, Leader.this);
fh.start();
} catch(SocketException e) {
if (stop) {
LOG.info("exceptionwhile shutting down acceptor: " + e);
// WhenLeader.shutdown() callsss.close(),
// the callto accept throws anexception.
// We catchand set stop to true.
stop= true;
} else {
throwe;
}
} catch (SaslException e) {
LOG.error("Exceptionwhile connecting to quorum learner", e);
}
}
b) 为集群生成新的epoch
通过Leader#getEpochToPropose方法计算epoch。这个方法会等待一段时间,如果这段时间内有超过半数follower成功与leader建立链接,那么将这些服务器中最大的epoch+1作为新的epoch。此过程就是。
Leader#getEpochToPropose示例代码如下:
private HashSet<Long> connectingFollowers= new HashSet<Long>();
/**
* 获取提议的epoch,此方法会等待足够多的follower进来,以确定epoch
* <li>如果lastAcceptedEpoch>leader.epoch,那么设置
* {@code leader.epoch =lastAcceptedEpoch+1};
* <li>如果没有超过半数的follower与leader通信,那么进入等待,直到超时或者足够的follower与leader建立通信。
*
* @param sid
* @param lastAcceptedEpoch
* : 最近接受的epoch,新leader选举出来以后的epoch不小于此值。
* @return
* @throws InterruptedException
* @throws IOException
*/
public long getEpochToPropose(long sid, long lastAcceptedEpoch)throws InterruptedException, IOException {
synchronized (connectingFollowers) {
if (!waitingForNewEpoch) {
return epoch;
}
if (lastAcceptedEpoch >= epoch){
epoch = lastAcceptedEpoch + 1;
}
connectingFollowers.add(sid);
QuorumVerifier verifier= self.getQuorumVerifier();
if (connectingFollowers.contains(self.getId())&& verifier.containsQuorum(connectingFollowers)) {
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();
} else {
long start = System.currentTimeMillis();
long cur = start;
long end = start + self.getInitLimit()* self.getTickTime();
while (waitingForNewEpoch && cur < end) {
connectingFollowers.wait(end - cur);
cur=System.currentTimeMillis();
}
if (waitingForNewEpoch) {
throw newInterruptedException("Timeoutwhile waiting for epoch from quorum");
}
}
return epoch;
}
}
c) 统一集群epoch
当protocolVersion>=0x10000时,leader会发送LEADERINFO消息并等待follower返回信息,并且需要保证在指定时间内,有超过半数的follwer回复了此消息。protocolVersion<0x10000时此过程逻辑稍微有一点不同,比较简单,所以暂不讨论。
private HashSet<Long> electingFollowers= new HashSet<Long>();
private boolean electionFinished = false;
/**
* 等待超过半数的follower ack
* <li>首先判断当前leader的epoch、lastZxid是否比参数{@link ss}更新,<br>
* <ol>
* <li>如果新,那么将follower的sid加入到electingFollowers列表</li>
* <li>如果ss更新,说明follower的epoch或者lastZxid比leader更大,
* 那么follower不能加入到集群中来,所以抛出错误信息。</li>
* </ol>
* </li>
* <li>检查electingFollowers集合,判断是否有超过半数的follower回复了leader,如果是,唤醒线程;
* 如果不是那么等待直到超时或者达到半数这个条件。</li>
*
* @param id
* @param ss
* @throws IOException
* @throws InterruptedException
*/
public void waitForEpochAck(long id,StateSummary ss) throwsIOException,InterruptedException {
synchronized (electingFollowers) {
if (electionFinished) {
return;
}
if (ss.getCurrentEpoch() != -1) {
if (ss.isMoreRecentThan(leaderStateSummary)){
// follower比 leader 更新(事物id更大),抛出错误信息
thrownew IOException(
"Followeris ahead of the leader, leader summary:" + leaderStateSummary.getCurrentEpoch()
+ " (currentepoch), " + leaderStateSummary.getLastZxid()+ " (last zxid)");
}
electingFollowers.add(id);
}
QuorumVerifier verifier= self.getQuorumVerifier();
if (electingFollowers.contains(self.getId())&& verifier.containsQuorum(electingFollowers)) {
electionFinished= true;
electingFollowers.notifyAll();
} else {
long start = System.currentTimeMillis();
long cur = start;
long end = start + self.getInitLimit()* self.getTickTime();
while (!electionFinished && cur < end) {
electingFollowers.wait(end - cur);
cur=System.currentTimeMillis();
}
if (!electionFinished) {
throw newInterruptedException("Timeoutwhile waiting for epoch to be acked byquorum");
}
}
}
}
当同时满足epoch验证和ack验证是,认为有超过半数的follower与leader建立连接了,不满足条件时会抛出异常,退出lead流程。
3) 3-等待同步完成的消息
leader与follower、observer建立链接以后,需要进行数据同步,同样的,当在指定时间内和leader同步的follower少于一半时,也会抛出异常,退出lead流程。
单个learner与leader是否同步的判断标准是:同步完成后follower会发送ACK消息给leader,leader收到此消息,认为这个follower和leader同步了。
Leader#waitForNewLeaderAck示例:
/**
* LeaderHandler在收到ACK消息时调用此方法,用于告知leader同步已经完成。
* <li>此方法需要保证超过一半的服务器与leader保持一致。
*
* @param sid
* @param learnerType
* @throws InterruptedException
*/
public void waitForNewLeaderAck(long sid, long zxid,LearnerType learnerType) throws InterruptedException{
synchronized (newLeaderProposal.ackSet) {
if (quorumFormed) {
return;
}
long currentZxid = newLeaderProposal.packet.getZxid();
if (zxid != currentZxid){
LOG.error("NEWLEADERACK from sid: " + sid + "is from adifferent epoch - current 0x"
+Long.toHexString(currentZxid) + " receieved 0x" + Long.toHexString(zxid));
return;
}
if (learnerType == LearnerType.PARTICIPANT) {
newLeaderProposal.ackSet.add(sid);
}
if (self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
quorumFormed= true;
newLeaderProposal.ackSet.notifyAll();
} else {
long start = System.currentTimeMillis();
long cur = start;
long end = start + self.getInitLimit()* self.getTickTime();
while (!quorumFormed && cur< end) {
newLeaderProposal.ackSet.wait(end- cur);
cur=System.currentTimeMillis();
}
if (!quorumFormed) {
throw newInterruptedException("Timeoutwhile waiting for NEWLEADER to be acked byquorum");
}
}
}
}
4) 4-进入广播模式
leader进入while无限循环,每次循环线休眠一段时间,然后检验是否有超过一半的follower与leader保持同步,是否被设置为停止(running=false),如果满足这两个条件之一,则退出lead流程。
5) 5-确认leader、follower确认开始
在Leader、LearnerHandler中我们多次提到主从需要计算出新的epoch,那么他是如何计算的呢?这里将详细讲述这个过程。
选主完成后,follower去链接leader,链接好以后需要完成以下几件事情。
- follower发送FOLLOWERINFO消息(带上AcceptedEpoch、protocolVersion= 0x10000)。
- leader等待超过半数的follower发送FOLLOWERINFO信息给leader,然后计算出epoch(如有疑问,见2步)。接着回复LEADERINFO消息(带上epoch、protocolVersion= 0x10000)。这里和leader#getEpochToPropose相对应。
- Follower收到消息以后回复ACKEPOCH消息(带上epoch、lastLoggedZxid)。
- 过程有点绕,我们解释一下这么做的原因:
- 集群已超过半数为基本条件,这意味着epoch计算时,leader需要通过超过半数follower的epoch来计算出一个新的epoch,所以就必须等待这个条件。
- Leader需要将计算出的新epoch告诉follower。这也比较好理解:一届leader选举出以后,以后所有的数据应该都发生在此届之中,所以应该统一集群的epoch。
- Follower再次回复信息,一来是做个ack,告知已经让epoch和leader的epoch保持一致了;二来是为后续确定同步点做准备。
6) 6-确保超过半数的followerack leader
5中的最后一步指的是一个follower回复ACKEPOCH消息,但是集群需要确保超过半数的follower ack,所以这里会有一个等待确认的过程。这里和leader中等待ack过程相对应。
等待leader收到半数以上的ack,是为了确保整个集群多数派已经更新了epoch。
7) 7-确定同步点
a) leader中确定同步节点的逻辑是:
- 首先设置type=SNAP
- 如果follower.lastZxid = leader.lastProcessedZxid,那么设置type=DIFF。
- 如果commitedLog为空,那么设置type=DIFF。
- 如果follower.lastZxid >leader.maxCommittedLog, 那么设置type= TRUNC。
- 如果leader.maxCommittedLog>= follower.lastZxid并且 leader.minCommittedLog <= follower.lastZxid,确定同步起始位置,并且将此后的所有提议消息(提议+commit提议两个消息)加入queuedPackets中。
- 将type、zxid发送个follower
示例代码如下:
ReentrantReadWriteLocklock = leader.zk.getZKDatabase().getLogLock();
ReadLockrl = lock.readLock();
try {
rl.lock();
final longmaxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final longminCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizingwith Follower sid:" + sid + "maxCommittedLog=0x"
+Long.toHexString(maxCommittedLog)+ " minCommittedLog=0x" +Long.toHexString(minCommittedLog)
+ "peerLastZxid=0x" + Long.toHexString(peerLastZxid));
LinkedList<Proposal> proposals= leader.zk.getZKDatabase().getCommittedLog();
if (peerLastZxid== leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()){
// 同伴(follower)的事务id和leader的事务id相同,说明follower和leader是同步的。
LOG.info("leaderandfollower are in sync, zxid=0x{}", Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else if(proposals.size() != 0) {
LOG.debug("proposalsize is {}", proposals.size());
if ((maxCommittedLog >= peerLastZxid) &&(minCommittedLog <= peerLastZxid)) {
// follower有数据需要从leader处同步
LOG.debug("Sendingproposals to follower");
// as we look throughproposals, thisvariable keeps
// track of previous
// proposal Id.
long prevProposalZxid = minCommittedLog;
// Keep track ofwhether we are about tosend the first
// packet.
// Before sending thefirst packet, we haveto tell the
// learner
// whether to expect atrunc ora diff
boolean firstPacket = true;
// If we are here, wecan use committedLogto sync with
// follower. Then weonly need to decidewhether to
// send truncor not
packetToSend=Leader.DIFF;
zxidToSend = maxCommittedLog;
for(Proposal propose : proposals){
// skip theproposals the peer alreadyhas
if(propose.packet.getZxid()<= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else{
// If we are sending thefirst packet, figure
// out whether to trunc incase the follower has
// some proposals thatthe leader doesn't
if (firstPacket){
firstPacket = false;
//Does the peer have some proposals that
//the leader hasn't seen yet
if (prevProposalZxid< peerLastZxid) {
// send a trunc message beforesending
// the diff
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
queuePacket(propose.packet);
QuorumPacket qcommit = newQuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null,
null);
queuePacket(qcommit);
}
}
} else if (peerLastZxid > maxCommittedLog) {
LOG.debug("SendingTRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),Long.toHexString(updates));
packetToSend=Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
LOG.warn("Unhandledproposal scenario");
}
} else {
// just let the statetransfer happen
LOG.debug("proposalsisempty");
}
LOG.info("Sending" +Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this,updates);
} finally {
rl.unlock();
}
b) Follower收到消息后进行一下处理:
- 如果type= DIFF,说明主从有差异,直接接收数据就好。
- 如果type= SNAP,那么说明主从差异太多了,直接从主下载snapshot数据。
- 如果type= TRUNC,说明从的zxid比主大,需要将follower中大的这部分提议清理掉。
- 如果type为其他值,那说明异常了,退出服务。
8) 8-数据同步
a) leader
- 向queuedPackets队列中加入NEWLEADER消息。
- 在LearnerHandler线程中启动一个新线程将queuedPackets中的消息发送给follower。
- 因为最后一个消息是NEWLEADER,Follower收到此消息时,回复一个ACK消息,leader根据此消息确定同步过程结束。
- 等待指定的时间,确保leader半数以上的follower都已经完成数据同步,指定时间内集群满足这个条件,则集群成形;不满足则抛出异常,退出lead流程。
- 向queuedPackets队列中加入UPTODATE消息。
b) follower
接收leader发送过来的数据。
当收到NEWLEADER消息时,表示目前所有消息已经全部同步完了,Follower回复ACK消息告知Leader目前最新的zxid。
Leader收到ACK消息后,等待集群成形,即崩溃恢复模式结束,成型后会给follower发送
NEWLEADER消息,follower收到NEWLEADER后退出同步逻辑。
c) 说明
截止此步骤结束,整个zookeeper集群方退出崩溃恢复模式。到此步骤成功前的任何一个步骤失败或者条件不满足,均退出lead流程、follower流程,即再次进入选举流程。
9) 9-进入广播模式
这一环节是zookeeper集群接收提议、提交提议、维持心跳等逻辑的环节,即zookeeper集群正常运行的环节。这一阶段的过程大概如下:
Leader收到请求后,将请求封装成提议发送给follower,消息类型为PROPOSAL
Follower收到PROPOSAL消息以后,回复Leader一个ACK消息。
Leader发现一个PROPOSAL有超过半数以上回复,则认为此消息可以commit了,那么发送COMMIT消息给follower。
这一环节还有几种消息:
- PING:主从心跳维持消息。
- REVALIDATE:延长session,保证主从会话有效。
- SYNC:将已提交的数据刷新到committedRequests集合。
- UPTODATE:这个环节应该受到整个消息,如果收到了留一条日志。
3. 总结
Zookeeper数据同步核心就是在于上面提到的各种消息。接下来我们总结一下上面的整个流程。
1) 建立链接
选举后Leader、Follower身份将被确立下来;接着Follower向Leader发起连接,Leader创建LearnerHandler。
2) 确定集群epoch
leader等待超过半数follower链接,计算出新的epoch。
protocolVersion>0x10000时,Leader会等待Follower确认epoch。
3) 同步数据
leader根据follower给的zxid信息,计算出需要同步的数据,并将这些数据放到queuedPackets队列中。
在发送已提交的提议数据前,会发送给follower一个消息,告诉follower进行一些预处理。例如:如果follower的zxid太大,那么截取部分消息;如果follower落后太远,那么直接从leader获取snapshot等。
启动线程发送queuedPackets中的提议消息。
提议消息发送完成后,Leader发送发送NEWLEADER消息告知follower同步已结束;follower收到此消息以后会回复ACK消息给Leader,表示知道已经同步结束了。注意,leader会等待至少半数以上的FollowerACK,因为follower同步过程耗时可能差别较大,这里需要等待一段时间,让follower跟上leader。
超过半数Follwer ACK,意味着整个集群大部分服务器已经和leader同步了,那么Zookeeper集群也就“成形”了。leader再给一个UPTODATE消息,告诉follower退出同步逻辑。
4) 进入广播模式
follower、LearnerHandler都将进入无限循环,处理REQUEST、发送提议、提交提议、维持集群心跳。