ZAB协议恢复模式-数据同步

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: lead流程。 followLeader流程。 主从数据如何同步。 如何保证集群数据一致性。 异常场景分析。

上一篇博客中,我们详细讨论了Zookeeper的Leader选举过程,接下来我们讨论一下Leader选举以后的事情,并了解zookeeper的集群管理原理。

提前说明:

  • 本文主题虽然是讲述崩溃恢复模式,不过也会对广播模式的内容进行简单的描述。
  • 为了在文中描述不至于太过啰嗦,所以对超过半数省略掉了一个限定返回。例如当出现类似于“超过半数follower与leader同步”,“收到超过半数follower的回复”这种描述时,这种描述不正确,因为这个半数计算的时候是包含leader的。即如果文中讲述“超过半数”其实指的是leader、follower在一起,超过半数,或者所有服务器(不包括observer)超过半数。另外,Leader一定与自己是同步的。

 

本文主要讨论内容如下:

  • lead流程。
  • followLeader流程。
  • 主从数据如何同步。
  • 如何保证集群数据一致性。
  • 异常场景分析。

 

一  主从同步简化流程

1.  模型

目前在网上看到的主从不同原理基本上都是这样的:follower连接leader,将lastzxid告诉leader;leader根据last zxid确定同步点,然后将已经提交的协议发送给follower;follower同步完成后同通知leader。

2.  分析

此模型可以说它正确,也可以说它不正确,说它对是因为大体上确实是如此的;但如果剖根问底,同步的每一步具体是怎样的,这个图就显得不那么正确了。Zookeeper主从同步过程中有非常多消息,每种消息分别处理不同业务,并非上面简单的三步。

如果你只想知道个大概,那么就按照这个图理解就可以了,如果想了解更加详细的内容,搞清楚具体原理请继续阅读后续内容。

3.  问题

因为这个模型太过于简单,它无法回答以下几个问题:

  • 如果follower的zxid大于leader的zxid将会如何?

这种情况是可能存在的,例如:之前leader是S1服务器,leader宕机后过了一段时间才重新加入到集群中来,加进来是已经选好了新的leader(例如S2服务器),而此时S2的zxid小于S1,此时会如何?

  • Follower连不上leader,或者连接上了以后leader一直没有反应,集群将会如何?

例如:集群选举时leader正常,选举完成以后,在数据同步完成之前或数据同步过程中leader不可用了,那么集群将会如何?

  • 数据同步时,如果有部分Follower同步的很慢,集群一直无法完成崩溃恢复这一过程会如何?
  • Zookeeper集群是如何保证follower和leader同步以后才能接受请求的?
  • Leader、follower的通信时有哪些类型,如何保证消息不错乱,通信过程安全可靠的?

 

接下来会从两个角度分析ZAB协议恢复模式中数据同步逻辑。

  • 从leader、follower流程上分析。
  • 从leader、follwer数据交互上分析。

二  流程分析

此部分从leader、follower的流程初步探讨zookeeper集群工作原理,因为存在大量的主从交互,所以从流程上不一定很好理解,所以这一部分从整体上介绍一下工作原理,更加详细的部分放在“主从交互详解”中介绍。

1.  Leader

1)  流程图

a)  leader主流程

137a8267da42dfca3f5d1c473bab3db49ee60ca0

因为上图中的多个子流程基本上都是在LearnerHandler中实现的,所以我将这几个这里其他的几个子流程放在LearnerHandler中描述。

2)  分析

a)  流程分析
  • Leader每接受一个follower的链接,均会创建一个LearnerHandler对象,以后leader与此follower的交互均在LearnerHandler类中完成。
  • 接着就是确定新的epoch,必须从半数以上follower处获取epoch信息以后,才能生成新的epoch,即这里会会导致leader等待。见Leader#getEpochToPropose方法。
  • 然后是通知所有follower新的epoch,这个过程需要确保集群半数以上服务器达成一致,所以会等待。见leader#waitForEpochAck方法。
  • 然后等待集群完成崩溃恢复中数据同步逻辑,当集群中超过半数以上服务器发送ACK信息给leader以后,leader认为同步完成,恢复模式结束,接着进入广播模式。见Leader#waitForNewLeaderAck方法。
  • ZAB协议中恢复模式完成以后,集群进入广播模式,与之对应的是Leader会进入一个无限循环之中,在此循环中每隔一段时间统计一次同步的follower信息,并校验是否还有超过半数的follower与leader同步,如果少于一半,则退出lead流程。
b)  广播模式示例代码
boolean tickSkip = true;
while (true) {
   //休眠
   Thread.sleep(self.tickTime / 2);
   if (!tickSkip){
       self.tick++;
   }
   HashSet<Long> syncedSet = new HashSet<Long>();
   
    //lock on the followers when we use it.
   syncedSet.add(self.getId());
   
   for (LearnerHandler f :getLearners()) {
        // Synced setis used to check we have a supporting quorum,
        // so only
        // PARTICIPANT, notOBSERVER, learners should be used
       if (f.synced() && f.getLearnerType()==LearnerType.PARTICIPANT) {
          //更新同步集合列表
          syncedSet.add(f.getSid());
       }
       //心跳维持
       f.ping();
   }
   
    //check leader running status
   if (!this.isRunning()){
        shutdown("Unexpectedinternal error");
       return;
   }
   
   if (!tickSkip&&!self.getQuorumVerifier().containsQuorum(syncedSet)) {
        //超过半数以上服务器与leader不同步时,退出lead流程
        // Lost quorum,shutdown
        shutdown("Not sufficientfollowers synced, only synced with sids: [ " + getSidSetString(syncedSet)
              + "]");
        // make sure the orderis the same!
        // the leader goes tolooking
       return;
   }
    tickSkip = !tickSkip;
}


2.  LearnerHandler

1)  流程图

a)  LearnerHandler主流程

5d05f113c914dd425572509c41e61c20f0ea364d

b)  计算集群epoch

0e75f681062790346f968c0b96f8e23ab4ada874 

c)  集群更新epoch

f28ca5d59c36034f2db6aa27849eb7ccbdac33e4 

d)  集群数据同步

 61abbe2a75fb75ae861a3b84ab0f8b7846dbabad

e)  广播模式

 630bf639227f2cba65e9d946d55c522bcb4dd089

2)  分析

a)  流程分析

LearnerHandler是leader中最为核心的一个类,他和Leader一起合作保证集群正常稳定的运行。LearnerHandler继承自ZooKeeperThread,由此可以看出它是一个线程,此线程负责和此follower的所有通信,具体来说需要负责以下几件事情:

  • 取follower得epoch,以便于leader计算出集群新的epoch。此过程需要确保超过半数以上follower告知leader他们的acceptedEpoch。
  • 通知集群服务器新的epoch。
  • Follower收到epoch后会回复一个消息(条件是protocolVersion>=0x10000,不满足此条件的流程略有差别),等待leader收到超过半数的follower回复此消息。此过程也是需要等待,确保整个集群半数以上服务器epoch已经一致了。
  • 根据follower的last zxid决定同步的起点,然后将需要同步的数据加入到一个队列中,新启动一个线程将这些数据发送给follower。此过程需要等待半数以上服务器完成与leader的同步。此过程结束则集群的崩溃恢复过程技术。
  • 将进入无限循环,读取数据包、处理数据包,广播给follower,即进入广播模式。

 

3.  Follower

1)  流程图

a)  Follwer主流程

0c910b76c0b555c1b824e3674960625cdef62390

b)  Epoch同步流程

 2213f1989a78876a3f9db81fcd2a8c5ce114d03b

c)  主从数据同步流程

330e185a3ea9083c99b4a249fb9db581edbefd57

d)  广播模式

0a1be0b7f3ef67048bdb71ea8f16b3a67f078583 

2)  分析

Follower的逻辑较为简单,总体来说负责以下内容:

  • 告知leader自己的acceptedEpoch,以便于leader计算出集群新的epoch。
  • 接收leader新epoch信息,以便于和集群保持一致的epoch。
  • 同步之前已经committed的数据,以便于和leader同步。
  • 通知leader自己已经同步完成,以便于集群退出崩溃恢复模式。
  • 数据同步完成以后,将进入无限循环,读取数据包、处理数据包,即进入广播模式,处理提议信息。

 

三  主从交互详解

接下来从主从交互的角度上详细讨论一下zookeeper工作原理。

1.  主从交互图

366e6df5a1fafe81d43d22a903d34bb3226ecc0c

说明:

以上模型是根据zookeeper-3.4.10画出来的。

在zookeeper不同版本中,逻辑会稍有不同,不过从整体上而言基本相同,数据同步逻辑中这种区别具体体现在:Leader端会根据follower传输过来的protocolVersion,做一些特殊处理。例如: protocolVersion<0x10000时,不会在等待ack前发送LEADERINFO消息;在上面流程如第7步中,如果protocolVersion<0x10000,会先写一个NEWLEADER消息到follower等。

Observer的流程和follower类似,主要区别在于:不参与确认集群状态,不参与提议处理。

 

2.  模型详细分析

以下重点对模型中标注数字的模块进行详细说明,阅读时请参考上面模型图。

1) 1-从磁盘中加载数据

加载磁盘数据,获取lastProcessZxid用于后续生成newEpoch。每次leader诞生以后,都需要升级epoch。

2) 2-保证有超过半数的follower与leader建立连接

a)  创建LearnerHandler

leader生成的条件时需要半数以上follower选此服务器;选出leader后,需要保证有超过半数的follower能和leader建立通信。

Leader会创建LearnerCnxAcceptor线程,专门负责接收follower的连接,将leader与follower建立连接以后,则创建一个LearnerHandler线程。

LearnerCnxAcceptor#run示例代码如下:


   while (!stop){
       try {
          Socket s= ss.accept();
           // start with theinitLimit, once the ack isprocessed
           // in LearnerHandlerswitch to the syncLimit
          s.setSoTimeout(self.tickTime *self.initLimit);
          s.setTcpNoDelay(nodelay);
   
          BufferedInputStream is= new BufferedInputStream(s.getInputStream());
           // 建立处理learner请求的处理器
          LearnerHandler fh= new LearnerHandler(s, is, Leader.this);
          fh.start();
       } catch(SocketException e) {
          if (stop) {
               LOG.info("exceptionwhile shutting down acceptor: " + e);
   
               // WhenLeader.shutdown() callsss.close(),
               // the callto accept throws anexception.
               // We catchand set stop to true.
              stop= true;
          } else {
              throwe;
           }
       } catch (SaslException e) {
           LOG.error("Exceptionwhile connecting to quorum learner", e);
       }
   }

b)  为集群生成新的epoch

通过Leader#getEpochToPropose方法计算epoch。这个方法会等待一段时间,如果这段时间内有超过半数follower成功与leader建立链接,那么将这些服务器中最大的epoch+1作为新的epoch。此过程就是。

Leader#getEpochToPropose示例代码如下:

private HashSet<Long> connectingFollowers= new HashSet<Long>();
 
/**
 * 获取提议的epoch,此方法会等待足够多的follower进来,以确定epoch
 * <li>如果lastAcceptedEpoch>leader.epoch,那么设置
 * {@code leader.epoch =lastAcceptedEpoch+1};
 * <li>如果没有超过半数的follower与leader通信,那么进入等待,直到超时或者足够的follower与leader建立通信。
 * 
 * @param sid
 * @param lastAcceptedEpoch
 *           : 最近接受的epoch,新leader选举出来以后的epoch不小于此值。
 * @return
 * @throws InterruptedException
 * @throws IOException
 */
public long getEpochToPropose(long sid, long lastAcceptedEpoch)throws InterruptedException, IOException {
    synchronized (connectingFollowers) {
        if (!waitingForNewEpoch) {
          return epoch;
       }
       if (lastAcceptedEpoch >= epoch){
          epoch = lastAcceptedEpoch + 1;
       }
        connectingFollowers.add(sid);
      QuorumVerifier verifier= self.getQuorumVerifier();
       if (connectingFollowers.contains(self.getId())&& verifier.containsQuorum(connectingFollowers)) {
           waitingForNewEpoch = false;
          self.setAcceptedEpoch(epoch);
          connectingFollowers.notifyAll();
       } else {
          long start = System.currentTimeMillis();
          long cur = start;
          long end = start + self.getInitLimit()* self.getTickTime();
          while (waitingForNewEpoch && cur < end) {
              connectingFollowers.wait(end - cur);
              cur=System.currentTimeMillis();
           }
           if (waitingForNewEpoch) {
               throw newInterruptedException("Timeoutwhile waiting for epoch from quorum");
           }
       }
       return epoch;
   }
}


c)  统一集群epoch

当protocolVersion>=0x10000时,leader会发送LEADERINFO消息并等待follower返回信息,并且需要保证在指定时间内,有超过半数的follwer回复了此消息。protocolVersion<0x10000时此过程逻辑稍微有一点不同,比较简单,所以暂不讨论。


private HashSet<Long> electingFollowers= new HashSet<Long>();
private boolean electionFinished = false;
 
/**
 * 等待超过半数的follower ack
 * <li>首先判断当前leader的epoch、lastZxid是否比参数{@link ss}更新,<br>
 * <ol>
 * <li>如果新,那么将follower的sid加入到electingFollowers列表</li>
 * <li>如果ss更新,说明follower的epoch或者lastZxid比leader更大,
 * 那么follower不能加入到集群中来,所以抛出错误信息。</li>
 * </ol>
 * </li>
 * <li>检查electingFollowers集合,判断是否有超过半数的follower回复了leader,如果是,唤醒线程;
 * 如果不是那么等待直到超时或者达到半数这个条件。</li>
 * 
 * @param id
 * @param ss
 * @throws IOException
 * @throws InterruptedException
 */
public void waitForEpochAck(long id,StateSummary ss) throwsIOException,InterruptedException {
    synchronized (electingFollowers) {
        if (electionFinished) {
          return;
       }
       if (ss.getCurrentEpoch() != -1) {
          if (ss.isMoreRecentThan(leaderStateSummary)){
               // follower比 leader 更新(事物id更大),抛出错误信息
              thrownew IOException(
                      "Followeris ahead of the leader, leader summary:" + leaderStateSummary.getCurrentEpoch()
                             + " (currentepoch), " + leaderStateSummary.getLastZxid()+ " (last zxid)");
           }
          electingFollowers.add(id);
       }
      QuorumVerifier verifier= self.getQuorumVerifier();
       if (electingFollowers.contains(self.getId())&& verifier.containsQuorum(electingFollowers)) {
          electionFinished= true;
          electingFollowers.notifyAll();
       } else {
          long start = System.currentTimeMillis();
           long cur = start;
          long end = start + self.getInitLimit()* self.getTickTime();
          while (!electionFinished && cur < end) {
              electingFollowers.wait(end - cur);
              cur=System.currentTimeMillis();
           }
          if (!electionFinished) {
               throw newInterruptedException("Timeoutwhile waiting for epoch to be acked byquorum");
           }
       }
   }
}


 

当同时满足epoch验证和ack验证是,认为有超过半数的follower与leader建立连接了,不满足条件时会抛出异常,退出lead流程。

 3) 3-等待同步完成的消息

leader与follower、observer建立链接以后,需要进行数据同步,同样的,当在指定时间内和leader同步的follower少于一半时,也会抛出异常,退出lead流程。

单个learner与leader是否同步的判断标准是:同步完成后follower会发送ACK消息给leader,leader收到此消息,认为这个follower和leader同步了。

Leader#waitForNewLeaderAck示例:


/**
 * LeaderHandler在收到ACK消息时调用此方法,用于告知leader同步已经完成。
 * <li>此方法需要保证超过一半的服务器与leader保持一致。
 *
 * @param sid
 * @param learnerType
 * @throws InterruptedException
 */
public void waitForNewLeaderAck(long sid, long zxid,LearnerType learnerType) throws InterruptedException{
 
    synchronized (newLeaderProposal.ackSet) {
 
       if (quorumFormed) {
          return;
       }
 
       long currentZxid = newLeaderProposal.packet.getZxid();
       if (zxid != currentZxid){
           LOG.error("NEWLEADERACK from sid: " + sid + "is from adifferent epoch - current 0x"
                  +Long.toHexString(currentZxid) + " receieved 0x" + Long.toHexString(zxid));
          return;
       }
 
       if (learnerType == LearnerType.PARTICIPANT) {
           newLeaderProposal.ackSet.add(sid);
       }
 
       if (self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
          quorumFormed= true;
          newLeaderProposal.ackSet.notifyAll();
       } else {
          long start = System.currentTimeMillis();
          long cur = start;
          long end = start + self.getInitLimit()* self.getTickTime();
          while (!quorumFormed && cur< end) {
              newLeaderProposal.ackSet.wait(end- cur);
              cur=System.currentTimeMillis();
           }
          if (!quorumFormed) {
               throw newInterruptedException("Timeoutwhile waiting for NEWLEADER to be acked byquorum");
           }
       }
   }
}

4) 4-进入广播模式

leader进入while无限循环,每次循环线休眠一段时间,然后检验是否有超过一半的follower与leader保持同步,是否被设置为停止(running=false),如果满足这两个条件之一,则退出lead流程。

 

5) 5-确认leader、follower确认开始

在Leader、LearnerHandler中我们多次提到主从需要计算出新的epoch,那么他是如何计算的呢?这里将详细讲述这个过程。

选主完成后,follower去链接leader,链接好以后需要完成以下几件事情。

  • follower发送FOLLOWERINFO消息(带上AcceptedEpoch、protocolVersion= 0x10000)。
  • leader等待超过半数的follower发送FOLLOWERINFO信息给leader,然后计算出epoch(如有疑问,见2步)。接着回复LEADERINFO消息(带上epoch、protocolVersion= 0x10000)。这里和leader#getEpochToPropose相对应。
  • Follower收到消息以后回复ACKEPOCH消息(带上epoch、lastLoggedZxid)。
  • 过程有点绕,我们解释一下这么做的原因:
  • 集群已超过半数为基本条件,这意味着epoch计算时,leader需要通过超过半数follower的epoch来计算出一个新的epoch,所以就必须等待这个条件。
  • Leader需要将计算出的新epoch告诉follower。这也比较好理解:一届leader选举出以后,以后所有的数据应该都发生在此届之中,所以应该统一集群的epoch。
  • Follower再次回复信息,一来是做个ack,告知已经让epoch和leader的epoch保持一致了;二来是为后续确定同步点做准备。

 

6) 6-确保超过半数的followerack leader

5中的最后一步指的是一个follower回复ACKEPOCH消息,但是集群需要确保超过半数的follower ack,所以这里会有一个等待确认的过程。这里和leader中等待ack过程相对应。

等待leader收到半数以上的ack,是为了确保整个集群多数派已经更新了epoch。

 

7) 7-确定同步点

a)  leader中确定同步节点的逻辑是:
  • 首先设置type=SNAP
  • 如果follower.lastZxid = leader.lastProcessedZxid,那么设置type=DIFF。
  • 如果commitedLog为空,那么设置type=DIFF。
  • 如果follower.lastZxid >leader.maxCommittedLog, 那么设置type= TRUNC。
  • 如果leader.maxCommittedLog>= follower.lastZxid并且 leader.minCommittedLog <= follower.lastZxid,确定同步起始位置,并且将此后的所有提议消息(提议+commit提议两个消息)加入queuedPackets中。
  • 将type、zxid发送个follower

示例代码如下:

ReentrantReadWriteLocklock = leader.zk.getZKDatabase().getLogLock();
ReadLockrl = lock.readLock();
try {
   rl.lock();
   final longmaxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
   final longminCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
    LOG.info("Synchronizingwith Follower sid:" + sid + "maxCommittedLog=0x"
           +Long.toHexString(maxCommittedLog)+ " minCommittedLog=0x" +Long.toHexString(minCommittedLog)
          + "peerLastZxid=0x" + Long.toHexString(peerLastZxid));
 
   LinkedList<Proposal> proposals= leader.zk.getZKDatabase().getCommittedLog();
 
   if (peerLastZxid== leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()){
        // 同伴(follower)的事务id和leader的事务id相同,说明follower和leader是同步的。
        LOG.info("leaderandfollower are in sync, zxid=0x{}", Long.toHexString(peerLastZxid));
       packetToSend = Leader.DIFF;
        zxidToSend = peerLastZxid;
   } else if(proposals.size() != 0) {
       LOG.debug("proposalsize is {}", proposals.size());
        if ((maxCommittedLog >= peerLastZxid) &&(minCommittedLog <= peerLastZxid)) {
           // follower有数据需要从leader处同步
           LOG.debug("Sendingproposals to follower");
 
           // as we look throughproposals, thisvariable keeps
           // track of previous
           // proposal Id.
           long prevProposalZxid = minCommittedLog;
 
           // Keep track ofwhether we are about tosend the first
          // packet.
           // Before sending thefirst packet, we haveto tell the
          // learner
           // whether to expect atrunc ora diff
          boolean firstPacket = true;
 
           // If we are here, wecan use committedLogto sync with
           // follower. Then weonly need to decidewhether to
           // send truncor not
          packetToSend=Leader.DIFF;
           zxidToSend = maxCommittedLog;
 
          for(Proposal propose : proposals){
               // skip theproposals the peer alreadyhas
              if(propose.packet.getZxid()<= peerLastZxid) {
                  prevProposalZxid = propose.packet.getZxid();
                  continue;
              } else{
                  // If we are sending thefirst packet, figure
                  // out whether to trunc incase the follower has
                  // some proposals thatthe leader doesn't
                  if (firstPacket){
                      firstPacket = false;
                      //Does the peer have some proposals that
                      //the leader hasn't seen yet
                      if (prevProposalZxid< peerLastZxid) {
                          // send a trunc message beforesending
                         // the diff
                         packetToSend = Leader.TRUNC;
                         zxidToSend = prevProposalZxid;
                         updates = zxidToSend;
                      }
                  }
                  queuePacket(propose.packet);
                  QuorumPacket qcommit = newQuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null,
                         null);
                  queuePacket(qcommit);
              }
           }
        } else if (peerLastZxid > maxCommittedLog) {
           LOG.debug("SendingTRUNC to follower zxidToSend=0x{} updates=0x{}",
                 Long.toHexString(maxCommittedLog),Long.toHexString(updates));
 
          packetToSend=Leader.TRUNC;
           zxidToSend = maxCommittedLog;
           updates = zxidToSend;
       } else {
           LOG.warn("Unhandledproposal scenario");
       }
   } else {
        // just let the statetransfer happen
        LOG.debug("proposalsisempty");
   }
 
   LOG.info("Sending" +Leader.getPacketType(packetToSend));
   leaderLastZxid = leader.startForwarding(this,updates);
 
} finally {
   rl.unlock();
}


b)  Follower收到消息后进行一下处理:
  • 如果type= DIFF,说明主从有差异,直接接收数据就好。
  • 如果type= SNAP,那么说明主从差异太多了,直接从主下载snapshot数据。
  • 如果type= TRUNC,说明从的zxid比主大,需要将follower中大的这部分提议清理掉。
  • 如果type为其他值,那说明异常了,退出服务。

 

8) 8-数据同步

a)  leader
  • 向queuedPackets队列中加入NEWLEADER消息。
  • 在LearnerHandler线程中启动一个新线程将queuedPackets中的消息发送给follower。
  • 因为最后一个消息是NEWLEADER,Follower收到此消息时,回复一个ACK消息,leader根据此消息确定同步过程结束。
  • 等待指定的时间,确保leader半数以上的follower都已经完成数据同步,指定时间内集群满足这个条件,则集群成形;不满足则抛出异常,退出lead流程。
  • 向queuedPackets队列中加入UPTODATE消息。

 

b)  follower

接收leader发送过来的数据。

当收到NEWLEADER消息时,表示目前所有消息已经全部同步完了,Follower回复ACK消息告知Leader目前最新的zxid。

Leader收到ACK消息后,等待集群成形,即崩溃恢复模式结束,成型后会给follower发送NEWLEADER消息,follower收到NEWLEADER后退出同步逻辑。

 

c)  说明

截止此步骤结束,整个zookeeper集群方退出崩溃恢复模式。到此步骤成功前的任何一个步骤失败或者条件不满足,均退出lead流程、follower流程,即再次进入选举流程。

 

9) 9-进入广播模式

这一环节是zookeeper集群接收提议、提交提议、维持心跳等逻辑的环节,即zookeeper集群正常运行的环节。这一阶段的过程大概如下:

Leader收到请求后,将请求封装成提议发送给follower,消息类型为PROPOSAL

Follower收到PROPOSAL消息以后,回复Leader一个ACK消息。

Leader发现一个PROPOSAL有超过半数以上回复,则认为此消息可以commit了,那么发送COMMIT消息给follower。

 

这一环节还有几种消息:

  • PING:主从心跳维持消息。
  • REVALIDATE:延长session,保证主从会话有效。
  • SYNC:将已提交的数据刷新到committedRequests集合。
  • UPTODATE:这个环节应该受到整个消息,如果收到了留一条日志。

 

3.  总结

Zookeeper数据同步核心就是在于上面提到的各种消息。接下来我们总结一下上面的整个流程。

1) 建立链接

选举后Leader、Follower身份将被确立下来;接着Follower向Leader发起连接,Leader创建LearnerHandler。

2) 确定集群epoch

leader等待超过半数follower链接,计算出新的epoch。

protocolVersion>0x10000时,Leader会等待Follower确认epoch。

3) 同步数据

leader根据follower给的zxid信息,计算出需要同步的数据,并将这些数据放到queuedPackets队列中。

在发送已提交的提议数据前,会发送给follower一个消息,告诉follower进行一些预处理。例如:如果follower的zxid太大,那么截取部分消息;如果follower落后太远,那么直接从leader获取snapshot等。

启动线程发送queuedPackets中的提议消息。

提议消息发送完成后,Leader发送发送NEWLEADER消息告知follower同步已结束;follower收到此消息以后会回复ACK消息给Leader,表示知道已经同步结束了。注意,leader会等待至少半数以上的FollowerACK,因为follower同步过程耗时可能差别较大,这里需要等待一段时间,让follower跟上leader。

超过半数Follwer ACK,意味着整个集群大部分服务器已经和leader同步了,那么Zookeeper集群也就“成形”了。leader再给一个UPTODATE消息,告诉follower退出同步逻辑。

 

4) 进入广播模式

follower、LearnerHandler都将进入无限循环,处理REQUEST、发送提议、提交提议、维持集群心跳。

 

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
6月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
7月前
|
SQL 缓存 算法
实时计算 Flink版产品使用合集之可以把初始同步完了用增量模式,但初始数据还是要同步,除非初始的数据同步换成用其他工具先同步过去吧,是这个意思吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL 分布式计算 DataWorks
DataWorks不仅提供单表离线模式,还支持多种数据同步任务类型。
【2月更文挑战第31天】DataWorks不仅提供单表离线模式,还支持多种数据同步任务类型。这些类型包括整库离线同步(一次性全量同步、周期性全量同步、离线全增量同步、一次性增量同步、周期性增量同步)以及一键实时同步(一次性全量同步,实时增量同步)。此外,DataWorks还提供了数据类型转换的功能,您可以选择在源端和目标端使用相同的数据类型以避免数据类型转换,或者在源端和目标端使用不同的数据类型,然后在同步时手动转换数据类型。
88 6
|
7月前
|
存储 NoSQL 数据库连接
Redis主从模式以及数据同步原理:全量数据同步、增量数据同步
Redis主从模式以及数据同步原理:全量数据同步、增量数据同步
807 0
|
算法
ZooKeeper-集群-ZAB协议与数据同步
前言 在前面两篇文章中,我们认识了什么是ZooKeeper,ZooKeeper有哪些功能,ZooKeeper集群,以及ZooKeeper集群中的选举机制。那么在ZooKeeper集群中,数据是如何在节点间同步的呢?数据同步过程中又会产生哪些问题又是如何解决的呢? 在下面这篇文章中,将为大家讲解
201 0
|
弹性计算 大数据 关系型数据库
用脚本模式配置数据同步
本文主要用自定义的ECS来调度来解网络不可达的问题。通过使用脚本模式来解因为网络不可达导致的向导模式无法配置的问题。
3270 0
|
4月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
130 1
|
3月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
775 4
|
4月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步

热门文章

最新文章