- 在Follower中源码如下, 尝试五次和Leader建立连接,重试五次后放弃
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, ConnectException, InterruptedException { sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); for (int tries = 0; tries < 5; tries++) { try { sock.connect(addr, self.tickTime * self.syncLimit); sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { if (tries == 4) { LOG.error("Unexpected exception",e); throw e; } else { LOG.warn("Unexpected exception, tries="+tries+ ", connecting to " + addr,e); sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); } } Thread.sleep(1000); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
- 在Leader中等待建立连接, 每当向上面有客户端请求和Leader建立连接,就在如下的run()逻辑中的
Leader.java @Override public void run() { try { while (!stop) { // todo 下面的主要逻辑就是,在当前线程中轮询,只要有一条连接进来就单独开启一条线程(LearnerHandler) try{ // todo 从serversocket中获取连接 Socket s = ss.accept(); // start with the initLimit, once the ack is processed in LearnerHandler switch to the syncLimit // todo 从initlimit开始,在learnerhandler中处理ack之后,切换到synclimit s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay);// todo 禁用delay算法 // todo 读取socket中的数据 BufferedInputStream is = new BufferedInputStream(s.getInputStream()); // todo 创建处理所有leanner信息的 handler,他也线程类 LearnerHandler fh = new LearnerHandler(s, is, Leader.this); fh.start();
protected long registerWithLeader(int pktType) throws IOException{ /* * Send follower info, including last zxid and sid */ long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); /* * Add sid to payload */ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); // todo 往leader发送数据 writePacket(qp, true); readPacket(qp);
下面的接受解析请求的逻辑,learner接收到Follower的注册响应后首先是从请求中,将request解析出来, 然后验证一下,如果不是Leader.FOLLOWERINFO 或者是Leader.Observer 类型的直接返回了,如果是接着往下处理
引出了epoch的概念,它全长64位,前32位代表的是第几代Leader,因为网络或者其他原因,leader是可能挂掉的,Leader有属于自己的一个epoch编号,从1,2..开始,一旦Leader挂了,从新选出来的Leader的epoch就会更新,肯定会比原来老leader的epoch值大, 后32位标记的就是当前leader发起的第几次决议
看它是怎么处理的,通过代码,它会选出所有的Follower中最大的epoch值,并且在此基础上+1,作为最新的epoch值,当然这是Leader自己选出来的值,那Follower能不能同意这个值呢?,跟进leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
, 那什么时候唤醒呢? 其实只要集群中再有其他的Follower启动,会重复执行以上的逻辑,再次来到这个方法进行半数检验,就有可能唤醒
if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { waitingForNewEpoch = false; self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(waitingForNewEpoch && cur < end) { connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } }
再往后,leader向Follower发送确认ack,包含最新的epoch+zxid,告诉Follower以后它的事务就从这个zxid开始,这个ack的header= Leader.LEADERINFO
public void run() { try { leader.addLearnerHandler(this); tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit; ia = BinaryInputArchive.getArchive(bufferedInput); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); oa = BinaryOutputArchive.getArchive(bufferedOutput); QuorumPacket qp = new QuorumPacket(); // todo 读取follower发送过来的数据 ia.readRecord(qp, "packet"); // todo 第一次Follower发送的注册请求的header = Leader.FOLLOWERINFO // todo leader 遇到非FOLLOWERINFO的 和 OBSERVERINFO的消息直接返回 if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){ LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!"); return; } . . . //获取出Follower中最后一次epoch long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); // todo leader用当前方法从众多follower中选出epoch值最大的(而且还会再最大的基础上加1) // todo this.getSid()指定的 learner 的myid // todo this.getSid()指定的 learner 的lastAcceptedEpoch long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); . . . } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); // todo leader接收到learner的数据之后,给learnner 发送LEADERINFO类型的响应 // todo 返回了最新的epoch QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH"); return; } ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); // todo 等待learner的响应ack leader.waitForEpochAck(this.getSid(), ss); }
Follower获取到leader的响应信息,解析出当前leader的 leaderProtocolVersion
,然后给leader发送 header=Leader.ACKEPOCH
protected long registerWithLeader(int pktType) throws IOException{ . . . . readPacket(qp); final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); if (qp.getType() == Leader.LEADERINFO) { // we are connected to a 1.0 server so accept the new epoch and read the next packet leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); if (newEpoch > self.getAcceptedEpoch()) { wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { // since we have already acked an epoch equal to the leaders, we cannot ack // again, but we still need to send our lastZxid to the leader so that we can // sync with it if it does assume leadership of the epoch. // the -1 indicates that this reply should not count as an ack for the new epoch wrappedEpochBytes.putInt(-1); } else { throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch()); } QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); return ZxidUtils.makeZxid(newEpoch, 0);
- 查看当前Follower的zxid是不是处于
- 第一点: 并没有挨个发送同步的请求,而是把他们放到一个集合中,统一发送
- QuorumPacket的类型是
, Follower接收到这个commit之后,直接会提交同步这个集合中的request,完成数据的同步操作
- 查看Follower中最后一次的zxid比Leader中的最大的zxid事务id还大,不管37 21 直接要求Follower将超过Leader的部分trunc,说白了就是删除掉
- 如果Follower中最大的zxid比leader中最小的zxid还小,使用快照的同步方式
if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { // Follower is already sync with us, send empty diff LOG.info("leader and follower are in sync, zxid=0x{}", Long.toHexString(peerLastZxid)); packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; } else if (proposals.size() != 0) { . . . if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { LOG.debug("Sending proposals to follower"); // as we look through proposals, this variable keeps track of previous proposal Id. // todo 当我们查看以前的建议时,这个变量存放的是之前最小的 建议id long prevProposalZxid = minCommittedLog; // Keep track of whether we are about to send the first packet. // todo 跟踪我们是否要发送第一个包 // Before sending the first packet, we have to tell the learner //todo 在我们发送第一个包之前, 我们要告诉leanner是期待一个 trunc 还是一个 diff // whether to expect a trunc or a diff boolean firstPacket=true; // If we are here, we can use committedLog to sync with follower. Then we only need to decide whether to send trunc or not // todo 当我们执行到这里了,我们使用 committedLog 来给Follower提供数据同步 packetToSend = Leader.DIFF; zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { // skip the proposals the peer already has if (propose.packet.getZxid() <= peerLastZxid) { prevProposalZxid = propose.packet.getZxid(); continue; } else { // If we are sending the first packet, figure out whether to trunc // in case the follower has some proposals that the leader doesn't // todo 当我们发送第一个packet时, 弄明白是否trunc, 以防leader没有Follower拥有的proposals if (firstPacket) { firstPacket = false; // Does the peer have some proposals that the leader hasn't seen yet if (prevProposalZxid < peerLastZxid) { // send a trunc message before sending the diff packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; } } // todo 放入队列(未发送) queuePacket(propose.packet); // todo 这一步就是leader给leanner发送的commit响应. leanner接收到这个响应之后无须在发送确认请求,直接同步数据 QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); } } } else if (peerLastZxid > maxCommittedLog) { // todo leanner最后一次提交的zxid 事务id比 leader中最大的事务id还大 LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}", Long.toHexString(maxCommittedLog), Long.toHexString(updates)); packetToSend = Leader.TRUNC; zxidToSend = maxCommittedLog; updates = zxidToSend; } else { LOG.warn("Unhandled proposal scenario"); } . . bufferedOutput.flush(); //Need to set the zxidToSend to the latest zxid // todo 需要将zxidToSend 设置成最新的zxid if (packetToSend == Leader.SNAP) { zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); } oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); /* if we are not truncating or sending a diff just send a snapshot */ if (packetToSend == Leader.SNAP) { LOG.info("Sending snapshot last zxid of peer is 0x" + Long.toHexString(peerLastZxid) + " " + " zxid of leader is 0x" + Long.toHexString(leaderLastZxid) + "sent zxid of db as 0x" + Long.toHexString(zxidToSend)); // Dump data to peer // todo 从快照中同步数据 leader.zk.getZKDatabase().serializeSnapshot(oa); // todo 快照直接通过socket发送出去 oa.writeString("BenWasHere", "signature"); } bufferedOutput.flush(); // Start sending packets //todo 创建一条新的线程,用这条线程发送上面存放到队列里面的数据 new Thread() { public void run() { Thread.currentThread().setName( "Sender-" + sock.getRemoteSocketAddress()); try { sendPackets(); } catch (InterruptedException e) { LOG.warn("Unexpected interruption",e); } } }.start();
- 如果是Snap,则将自己的ZKDB清空,然后加载Leader的快照
- 如果是trunc,就将不合法的zxid的记录全部删除,然后重新加载
- 如果是diff类型的,会进一步进入到
while (self.isRunning()) {..}
后才会跳出这个循环 - 通过下面的代码查看,Follower并没有先消费leader发送过来的request,因为它现在没有完成启动,没法交给Processor处理,因此它需要先启动,就在下面的
完成启动 - 启动之后,将这里request加载到内存完成数据同步
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); //todo 同步数据时,如果是diff这种情况, 我们不需要去生成一个快照,因为事务将在现有的快照的基础上完成同步 //todo 如果是 snap 或者 trunc 时,需要生成快照 boolean snapshotNeeded = true; // todo 从leader中读取出一个 packet readPacket(qp); LinkedList<Long> packetsCommitted = new LinkedList<Long>(); LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>(); synchronized (zk) { // todo diff if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); // todo 修改了一下这个变量的值,这个变量的值在下面的代码中赋值给了 writeToTxnLog snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { // todo 快照 LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid())); // The leader is going to dump the database clear our own database and read // todo 清空我们自己的ZKDB 使用leader发送的快照重建 zk.getZKDatabase().clear(); // todo leaderIs就是server发送过来的数据,进行反序列化 zk.getZKDatabase().deserializeSnapshot(leaderIs); String signature = leaderIs.readString("signature"); if (!signature.equals("BenWasHere")) { LOG.error("Missing signature. Got " + signature); throw new IOException("Missing signature"); } // todo 同步当前Follower中最大事务zxid zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid())); // TODO 删除log数据 boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { // not able to truncate the log LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid())); System.exit(13); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else { LOG.error("Got unexpected packet from leader " + qp.getType() + " exiting ... " ); System.exit(13); } zk.createSessionTracker(); long lastQueued = 0; boolean isPreZAB1_0 = true; // todo 如果不拍摄快照,请确保事务不应用于内存,而是写入事务日志 // todo diff模式下,snapshotNeeded=false //todo writeToTxnLog = true boolean writeToTxnLog = !snapshotNeeded; outerLoop: while (self.isRunning()) { // todo 在这个循环中继续读取数据, 如果是diff的话,就会读取到下面拿到commit case readPacket(qp); switch(qp.getType()) { case Leader.PROPOSAL: PacketInFlight pif = new PacketInFlight(); pif.hdr = new TxnHeader(); pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr); if (pif.hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(pif.hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = pif.hdr.getZxid(); packetsNotCommitted.add(pif); break; case Leader.COMMIT: if (!writeToTxnLog) { //todo diff模式下 条件为false pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); } else { zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } } else {//todo 进入这个分支 // todo 读取到的qa 添加到packetsCommitted linkedList中 , 这个队列在下面代码中使用 packetsCommitted.add(qp.getZxid()); } break; case Leader.INFORM: /* * Only observer get this type of packet. We treat this * as receiving PROPOSAL and COMMMIT. */ PacketInFlight packet = new PacketInFlight(); packet.hdr = new TxnHeader(); packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr); // Log warning message if txn comes out-of-order if (packet.hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(packet.hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = packet.hdr.getZxid(); if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { packetsNotCommitted.add(packet); packetsCommitted.add(qp.getZxid()); } break; case Leader.UPTODATE: // todo 想让下面的代码使用上面的队列就得跳出这个while 循环 // todo 这个while循环在当前case中完成跳出 // todo 也就是说,只有获取到Leader的uptoDate 请求时才来退出 if (isPreZAB1_0) { zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.cnxnFactory.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery // means this is Zab 1.0 // Create updatingEpoch file and remove it after current // epoch is set. QuorumPeer.loadDataBase() uses this file to // detect the case where the server was terminated after // taking a snapshot but before setting the current epoch. File updating = new File(self.getTxnFactory().getSnapDir(), QuorumPeer.UPDATING_EPOCH_FILENAME); if (!updating.exists() && !updating.createNewFile()) { throw new IOException("Failed to create " + updating.toString()); } if (snapshotNeeded) { zk.takeSnapshot(); } self.setCurrentEpoch(newEpoch); if (!updating.delete()) { throw new IOException("Failed to delete " + updating.toString()); } writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); sock.setSoTimeout(self.tickTime * self.syncLimit); // todo follower 完成初始化启动, 在跟下去就很熟悉了, 和单机启动流程神似 zk.startup(); self.updateElectionVote(newEpoch); if (zk instanceof FollowerZooKeeperServer) { } else if (zk instanceof ObserverZooKeeperServer) { /////////////////////////////////////////////////////////////////////////////// ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { Long zxid = packetsCommitted.peekFirst(); if (p.hdr.getZxid() != zxid) { LOG.warn("Committing " + Long.toHexString(zxid) + ", but next proposal is " + Long.toHexString(p.hdr.getZxid())); continue; } packetsCommitted.remove(); Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); request.txn = p.rec; request.hdr = p.hdr; ozk.commitRequest(request); } /////////////////////////////////////////////////////////////////////////////// } else { // New server type need to handle in-flight packets throw new UnsupportedOperationException("Unknown server type"); } }
// todo 从leader同步数据, 同时也是在这个方法中完成初始化启动的 syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); // todo 在follower中开启无线循环, 不停的接收服务端的pakcet,然后处理packet while (this.isRunning()) { readPacket(qp); // todo (接受leader发送的提议) processPacket(qp); }
Follower同步完数据,再跟Leader打交道就是 有客户端有了写请求,Follower需要将这个写请求转发leader进行广播
case Leader.REQUEST: // todo follower 接收到client的写请求之后,进入到这个case分支 bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if(type == OpCode.sync){ si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); leader.zk.submitRequest(si); break;
总结: 在本篇博客中,可以看到在Follower向Leader同步数据的过程中的几个阶段
- 发现: leader发现Follower并与之建立通信
- 同步: Follower可以主要通过两种方式完成和leader的数据同步工作
- 通过Leader的快照
- 通过leader的commitedLog中存放的包含snapshot的已经被持久化的request
- 原子广播: 这种情景是当Follower接收到客户端的写请求时,它会将这个请求转发给Leader,因为要保证数据的一致性(源码就在learnerHandler的run()方法的最后的while无限循环中CASE: Request)