2.3,leader投票开始(重点)
14,那么在这个SendWorker线程和这个RecvWorker这两个线程,就是一个用来给其他结点投票的线程,一个用来接收别的结点给当前结点投票的线程
接下来先看这个SendWorker的底层实现,由于下面sw.start()以及开启这个这个线程,那么就是主要他看的这个run方法就知道他底层的具体实现,就是一个发送选票的线程
@Override public void run() { //获取刚刚加入到map里面的阻塞队列 //就是说这个远端服务器有一个对应的队列,会通过socket管道连接将数据发送给别人 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if (bq == null || isSendQueueEmpty(bq)) { ByteBuffer b = lastMessageSent.get(sid); if (b != null) { send(b); } } }
接下来查看这个RecvWorker的这个底层实现,就是一个接收选票的线程,会将接收到的数据存放在一个队列里面
@Override public void run() { threadCnt.incrementAndGet(); try { //输出流,读取一下别的机器发的数据 din.readFully(msgArray, 0, length); ByteBuffer message = ByteBuffer.wrap(msgArray); //将这个信息存放到这个阻塞队列里面 addToRecvQueue(new Message(message.duplicate(), sid)); } }
15,在向其他的这个服务端发送这个投票信息后,会对这个投票进行一个处理
class WorkerSender extends ZooKeeperThread{ @Override public void run(){ try { //获取刚刚加入到队列里面的投票 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; //处理这个投票 process(m); } } }
然后在这个处理投票process的方法里面,主要是有一个toSend方法
void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer); }
然后这个toSend方法里面有这个具体的处理这个投票的请求,如果是自己投自己的票,那么会将消息加入到一个队列里面,如果是投别的票,就会加入到其他的队列里面。
public void toSend(Long sid, ByteBuffer b){ //如果机器的id是当前的myid if (this.mySid == sid) { b.position(0); //加入到一个队列里面 addToRecvQueue(new Message(b.duplicate(), sid)); } else { ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY); ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq); if (oldq != null) { addToSendQueue(oldq, b); } else { addToSendQueue(bq, b); } connectOne(sid); } }
同时如上图,由于底层使用的是bio,所以会有阻塞问题,因此在客户端发送选票信息里面增加了很多的这个队列
16,机器除了向自己或者向别的机器投票之外,也会去接收其他机器给当前机器的选票。接下来加入这个11的else if的这个方法里面
//如果收到了选票,会验证这个选票是否有效 else if (validVoter(n.sid) && validVoter(n.leader)) { switch (n.state) { case LOOKING: if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ } break; } //重点pk逻辑,通过这个totalOrderPredicate方法实现 else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //第一轮没有得出谁输输谁赢 //如果对面的这个机器胜了,那么就会更新这个当前机器的选票 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } //将所有接收到的选票存在这个hashMap里面 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //获取到所有的这个票数之后呢,就会去校验一下这个底层的逻辑,通过这个termPredicate方法 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { } if (n == null) { //如果pk成功,那么设置成LEADING,失败则设置成FOLLOWING self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); //返回这个要选举成leader的主结点 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } }
2.4,结点之间的pk(重点)
17,接下来就是这个pk的逻辑,通过这个totalOrderPredicate方法实现,其选举周期就是下面的这个return的这个方法。就是说会先比较投票周期,哪个结点的投票周期大,那么哪个结点pk赢,如果周期一样,那么会比较这个zxid的这个事务id,谁的事务id大,谁胜利,如果一样,那么就比较这个serverId,就是对应机器的myid,谁的myid大,那么谁pk胜利。
//newId : 接收到的选票 protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ //先判断这个选举周期是否比当前机器的选举周期大 return ((newEpoch > curEpoch) || //如果周期一样就会比较这个事务id谁大 ((newEpoch == curEpoch) && ((newZxid > curZxid) || //如果事务id一样,那么会比较这个myId谁大 ((newZxid == curZxid) && (newId > curId))))); }
18,接下来查看这个16里面的这个termPredicate方法,就是做一个校验,就是说验证一下这个被投的票数有没有过半,如果过半,那就成为一个leader结点。
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) { //最后会通过一个遍历的方式将这个票存在一个set的一个集合里面 return voteSet.hasAllQuorums(); } public boolean hasAllQuorums() { for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) { //判断这个票数有没有过半 if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) return false; } return true; } public boolean containsQuorum(Set<Long> ackSet) { return (ackSet.size() > half); }
19,前面两个结点就可以确定一个follow和一个leader,如果有第三台机器加入,那么第三台机器处于一个looking状态,已经有了这个leader和follow的话就不会再参与选举了,这个第三台机器直接从这个looking状态变为这个follow状态。就是在这第三台机器向这个follow和leader发送这个票数之后,follow和leader都会返回这个leader的这个sid,那么这个第三台机器里面存储的这个myid也会变成leader主结点的sid。
2.5,leader挂了重新选举
20,这样的话选举的流程就结束了,接下来讲一下如果这个leader挂了,follow如何选举新的leader。在选举完成之后,leader会走leader的选票流程,follow会走follow的业务流程。首先先看leader的业务流程
case LEADING: try { setLeader(makeLeader(logFactory)); //选举完成之后,会做一个leader的一个业务逻辑 leader.lead(); setLeader(null); }
其leader主要做的事情如下
void lead() throws IOException, InterruptedException { try{ //将文件数据加入到内存里面 zk.loadData(); //又会开启一个线程 cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); } }
这个线程的run方法如下,就是会初始化一个socket连接,便于和这个follow从结点进行一个通信
@Override public void run() { try{ s = ss.accept(); } }
21,主结点建立了一个socket连接通道之后,再来看看这个follow这个从结点的业务流程。
case FOLLOWING: try { setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { } finally { follower.shutdown(); setFollower(null); updateServerState(); } break;
在这个followLeader这个方法里面,也会创建一个通信的通道,用于连接这个主结点
void followLeader() throws InterruptedException { try{ QuorumServer leaderServer = findLeader(); try{ //连接主结点 connectToLeader(leaderServer.addr, leaderServer.hostname); //将zxid发送给主结点 syncWithLeader(newEpochZxid); } } }
这样的话这个leader和这个follow就建立起这个通道用于通信连接了,主要用于做一些数据同步,心跳的发送等。
22,leader和这个follow主要是通过这个长连接的方式进行通信的,就是过一段时间就会发送一段这个心跳包,如果一段时间内leader没有再发心跳,那么这个follow感知这个leader挂了,follow又会重新进行一个leader选举的操作。
//leader会给每个follow发送ping命令 for (LearnerHandler f : getLearners()) { f.ping(); }
如果从结点那边没有收到这个ping命令,那么这个从结点就会抛出一个io流的一个异常,并且会更新这个从结点的follow的状态,将follow变为这个Looking状态,又可以进行一个重新的选举
try { sock.close(); } private synchronized void updateServerState(){ setPeerState(ServerState.LOOKING); }
二,总结
1,leader选举总结
zookeeper的每个结点,都会有一个发送投票的线程和一个接收投票的线程,每个结点都是通过这种bio,就是阻塞Io的方式获取这个投票。
然后每个结点都会有一个对应的状态,比如说LOOKING观望状态,LEADING,FOLLOWING状态。在一开始,每个结点都处于这个LOOKING状态。
每个结点的都会有一个myid和事务id,这两个会通过一个map保存,如(myid,zxid),只要有两个结点或者以上,那么就会进行一个主从结点的一个选举,就是会通过这个投票线程投票和接收投票的线程接收到其他线程给自己的投票。选举开始就是每个线程都会先投自己一票,然后再给别的机器进行一个投票,然后机器里面就会存一个自己的myid,事务id和别的机器的sid,事务id,那么两个(myid,zxid)就会进行一个pk,谁赢谁留下来。
pk过程就是会先比较投票周期,哪个结点的投票周期大,那么哪个结点pk赢,如果周期一样,那么会比较这个zxid的这个事务id,谁的事务id大,谁胜利,如果一样,那么就比较这个serverId,就是对应机器的myid,谁的myid大,那么谁pk胜利。结点会将赢的(sid,zxid)保存,也就是两台机器都会保存同一个(sid,zxid)
只要这个(sid,zxid)超过一半,那么这台sid的机器就会成为这个leader,另一台机器就成为follow,那么这样选举就完成。
如果有第三台机器进来,那么这台机器处于LOOKING状态,如果有leader和follow,那么就不会重新选举,而是将这个leader的(sid,zxid)直接和这台机器的(myid,zxid)进行一个替换,那么这台机器也可以成为一个follow结点。
2,主结点挂了选举方式
zookeeper在选举完这个主结点之后,会新建一个socker进行一个通信,比如说一些主从复制,发布ping心跳命令等。如果说这个心跳出现异常,就是主结点挂了,那么这个从结点可以通过这个ping命令来感知到这个主结点挂了,并且会抛出异常,之后所有的这个从结点的状态会发生改变,从之前的following状态变为looking状态,此时没有这个lead和follow结点,那么这些LOOKING状态的结点又会进行一轮新的选举。