深入理解 ZK集群的Leader选举(二)

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 深入理解 ZK集群的Leader选举(二)

真正开始选举#


下面就去看一下quorumPeer.java的这个线程类的启动,部分run()方法的截取,我们关心它的lookForLeader()方法


while (running) {
switch (getPeerState()) {
    /**
     * todo 四种可能的状态, 经过了leader选举之后, 不同的服务器就有不同的角色
     * todo 也就是说,不同的服务器会会走动下面不同的分支中
     * LOOKING 正在进行领导者选举
     * Observing
     * Following
     * Leading
     */
case LOOKING:
    // todo 当为Looking状态时,会进入领导者选举的阶段
    LOG.info("LOOKING");
    if (Boolean.getBoolean("readonlymode.enabled")) {
        LOG.info("Attempting to start ReadOnlyZooKeeperServer");
        // Create read-only server but don't start it immediately
        // todo 创建了一个 只读的server但是不着急立即启动它
        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                logFactory, this,
                new ZooKeeperServer.BasicDataTreeBuilder(),
                this.zkDb);
        // Instead of starting roZk immediately, wait some grace(优雅) period(期间) before we decide we're partitioned.
        // todo 为了立即启动roZK ,在我们决定分区之前先等一会
        // Thread is used here because otherwise it would require changes in each of election strategy classes which is
        // unnecessary code coupling.
        //todo  这里新开启一条线程,避免每一个选举策略类上有不同的改变 而造成的代码的耦合
        Thread roZkMgr = new Thread() {
            public void run() {
                try {
                    // lower-bound grace period to 2 secs
                    sleep(Math.max(2000, tickTime));
                    if (ServerState.LOOKING.equals(getPeerState())) {
                        // todo 启动上面那个只读的Server
                        roZk.startup();
                    }
                } catch (InterruptedException e) {
                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                } catch (Exception e) {
                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                }
            }
        };
        try {
            roZkMgr.start();
            setBCVote(null);
            // todo 上面的代码都不关系,直接看它的 lookForLeader()方法
            // todo 直接点进去,进入的是接口,我们看它的实现类
            setCurrentVote(makeLEStrategy().lookForLeader());
        } catch (Exception e) {
            LOG.warn("Unexpected exception",e);
            setPeerState(ServerState.LOOKING);
        } finally {
            // If the thread is in the the grace period, interrupt
            // to come out of waiting.
            roZkMgr.interrupt();
            roZk.shutdown();
        }


下面是lookForLeader()的源码解读

说实话这个方法还真的是挺长的,但是吧这个方法真的很重要,因为我们可以从这个方法中找到网络上大家针对Leader的选举总结的点点滴滴


第一点: 每次的投票都会先投自己一票,说白了new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());将自己的myid,最大的zxid,以及第几届封装起来,但是还有一个细节,就是在投自己的同时,还是会将存有自己信息的这一票通过socket发送给其他的节点

接受别人的投票是通过QuorumManagerrecvWorker线程类将投票添加进recvQueue队列中,投票给自己时,就不走这条路线了,而是选择直接将票添加进recvQueue队列中

在下面代码中存在一行HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); 这个map可以理解成一个小信箱,每一个节点都会维护一个信箱,这里面可能存放着自己投给自己的票,或者别人投给自己的票,或者别人投给别人的票,或者自己投给别人的票,通过统计这个信箱中的票数可以决定某一个节点是否可以成为leader,源码如下, 使用信箱中的信息,


// todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader
    if (termPredicate(recvset,
            new Vote(proposedLeader, proposedZxid,
                    logicalclock.get(), proposedEpoch))) {
        // todo 到这里说明接收到投票的机器已经是准leader了
        // Verify if there is any change in the proposed leader
        // todo 校验一下, leader有没有变动
        while ((n = recvqueue.poll(finalizeWait,
                TimeUnit.MILLISECONDS)) != null) {
            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)) {
                recvqueue.put(n);
                break;
            }
        }
        if (n == null) {
                // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower
                // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支
                self.setPeerState((proposedLeader == self.getId()) ?
                        ServerState.LEADING : learningState());
                Vote endVote = new Vote(proposedLeader,
                        proposedZxid,
                        logicalclock.get(),
                        proposedEpoch);
                leaveInstance(endVote);
                return endVote;
            }
        }


termPredicate()函数中有如下的逻辑,self.getQuorumVerifier().containsQuorum(set);它的实现如下,实际上就是在进行过半机制的检验,结论就是当某个节点拥有了集群中一半以上的节点的投票时,它就会把自己的状态修改成leading, 其他的节点根据自己的需求将状态该变成following或者observing


public boolean containsQuorum(Set<Long> set){
        return (set.size() > half);
    }


维护着一个时钟,标记这是第几次投票了logicalclock他是AutomicLong类型的变量,他有什么用呢? 通过下面的代码可以看到如下的逻辑,就是当自己的时钟比当前接收到投票的时钟小时,说明自己可能因为其他原因错过了某次投票,所以更新自己的时钟,重新判断投自己还是投别人, 同理,如果接收到的投票的时钟小于自己当前的时钟,说明这个票是没有意义的,直接丢弃不理会


if (n.electionEpoch > logicalclock.get()) {
                                // todo 将自己的时钟调整为更新的时间
                                logicalclock.set(n.electionEpoch);
                                // todo 清空自己的投票箱
                                recvset.clear();


那么根据什么判断是投给自己还是投给别人呢? 通过解析出票的封装类中封装的节点的信息,什么信息呢?zxid,myid,epoch 通常情况是epoch大的优先成为leader,一般来说epoch都会相同,所以zxid大的优先成为leader,如果zxid再相同,则myid大的优先成为leader


检查到别的节点比自己更适合当leader,会重新投票,选举更适合的节点


完整的源码


// todo 当前进入的是FastLeaderElection.java的实现类
public Vote lookForLeader() throws InterruptedException {
try {
    // todo 创建用来选举Leader的Bean
    self.jmxLeaderElectionBean = new LeaderElectionBean();
    MBeanRegistry.getInstance().register(
            self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
    self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
    self.start_fle = Time.currentElapsedTime();
}
try {
    // todo 每台服务器独有的投票箱 , 存放其他服务器投过来的票的map
    // todo long类型的key (sid)标记谁给当前的server投的票   Vote类型的value 投的票
    HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    int notTimeout = finalizeWait;
    synchronized (this) {
        //todo Automic 类型的时钟
        logicalclock.incrementAndGet();
        //todo 一开始启动时,入参位置的值都取自己的,相当于投票给自己
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }
    LOG.info("New election. My id =  " + self.getId() +
            ", proposed zxid=0x" + Long.toHexString(proposedZxid));
    // todo 发送出去,投票自己
    sendNotifications();
    /*
     * Loop in which we exchange notifications until we find a leader
     */
    // todo 如果自己一直处于LOOKING的状态,一直循环
    while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
        /*
         * Remove next notification from queue, times out after 2 times
         * the termination time
         */
        //todo  尝试获取其他服务器的投票的信息
        // todo 从接受消息的队列中取出一个msg(这个队列中的数据就是它投票给自己的票)
        // todo 在QuorumCxnManager.java中 发送的投票的逻辑中,如果是发送给自己的,就直接加到recvQueue,而不经过socket
        // todo 所以它在这里是取出了自己的投票
        Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
        /*
         * Sends more notifications if haven't received enough.
         * Otherwise processes new notification.
         */
        // todo 第一轮投票这里不为空
        if (n == null) {
            // todo 第二轮就没有投票了,为null, 进入这个分支
            // todo 进行判断 ,如果集群中有三台服务器,现在仅仅启动一台服务器,还剩下两台服务器没启动
            // todo 那就会有3票, 其中1票直接放到 recvQueue , 另外两票需要发送给其他两台机器的逻辑就在这里判断
            // todo 验证是通不过的,因为queueSendMap中的两条队列都不为空
            if (manager.haveDelivered()) {
                sendNotifications();
            } else {
                // todo 进入这个逻辑
                manager.connectAll();
            }
            /*
             * Exponential backoff
             */
            int tmpTimeOut = notTimeout * 2;
            notTimeout = (tmpTimeOut < maxNotificationInterval ?
                    tmpTimeOut : maxNotificationInterval);
            LOG.info("Notification time out: " + notTimeout);
        } else if (validVoter(n.sid) && validVoter(n.leader)) {
            // todo 收到了其他服务器的投票信息后,来到下面的分支中处理
            /*
             * Only proceed if the vote comes from a replica in the
             * voting view for a replica in the voting view.
             * todo 仅当投票来自投票视图中的副本时,才能继续进行投票。
             */
            switch (n.state) {
                case LOOKING:
                    // todo 表示获取到投票的服务器的状态也是looking
                    // If notification > current, replace and send messages out
                    // todo 对比接收到的头片的 epoch和当前时钟先后
                    // todo 接收到的投票 > 当前服务器的时钟
                    // todo 表示当前server在投票过程中可能以为故障比其他机器少投了几次,需要重新投票
                    if (n.electionEpoch > logicalclock.get()) {
                        // todo 将自己的时钟调整为更新的时间
                        logicalclock.set(n.electionEpoch);
                        // todo 清空自己的投票箱
                        recvset.clear();
                        // todo 用别人的信息和自己的信息对比,选出一个更适合当leader的,如果还是自己适合,不作为, 对方适合,修改投票,投 对方
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();
                        // todo 接收到的投票 < 当前服务器的时钟
                        // todo 说明这个投票已经不能再用了
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                        // todo 别人的投票时钟和我的时钟是相同的
                        // todo 满足 totalOrderPredicate 后,会更改当前的投票,重新投票
                        /**
                         *   在 totalOrderPredicate 比较两者之间谁更满足条件
                         *   ((newEpoch > curEpoch) ||
                         *   ((newEpoch == curEpoch) &&
                         *   ((newZxid > curZxid) ||
                         *   ((newZxid == curZxid) &&
                         *   (newId > curId)))));
                         */
                        // todo 返回true说明 对方更适合当leader
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }
                    // todo 将自己的投票存放到投票箱子中
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    // todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {
                        // todo 到这里说明接收到投票的机器已经是准leader了
                        // Verify if there is any change in the proposed leader
                        // todo 校验一下, leader有没有变动
                        while ((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }
                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower
                            // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING : learningState());
                            Vote endVote = new Vote(proposedLeader,
                                    proposedZxid,
                                    logicalclock.get(),
                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    // todo 禁止Observer参加投票
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if (n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader,
                                n.zxid,
                                n.electionEpoch,
                                n.peerEpoch));
                        if (ooePredicate(recvset, outofelection, n)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING : learningState());
                            Vote endVote = new Vote(n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                            n.leader,
                            n.zxid,
                            n.electionEpoch,
                            n.peerEpoch,
                            n.state));
                    if (ooePredicate(outofelection, outofelection, n)) {
                        synchronized (this) {
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING : learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                n.zxid,
                                n.electionEpoch,
                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
            }
        } else {
            if (!validVoter(n.leader)) {
                LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
            }
            if (!validVoter(n.sid)) {
                LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
            }
        }
    }
    return null;


经过如上的判断各个节点的就可以选举出不同的角色,再次回到QuorumPeer.javarun()中进行循环时,不再会进入case LOOKING:代码块了,而是按照自己不同的角色各司其职,完成不同的初始化启动

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
存储 算法 Java
深入理解 ZK集群的Leader选举(一)
深入理解 ZK集群的Leader选举(一)
367 0
|
调度
zookeeper-集群-选举机制
前言 上一篇文章中,我们简单的介绍了zookeeper产生的背景,数据模型中的4种znode,可以实现的功能等。接下来,在这篇文章中,我们将介绍zookeeper的集群以及选举机制。
297 0
|
API Apache
利用Zookeeper实现分布式应用的Leader选举
利用Zookeeper实现分布式应用的Leader选举
337 0
利用Zookeeper实现分布式应用的Leader选举
Zookeeper Leader选举机制
Zookeeper Leader选举机制
92 0
Zookeeper的Leader选举
Leader选举是保证分布式数据一致性的关键所在。Leader选举分为Zookeeper集群初始化启动时选举和Zookeeper集群运行期间Leader重新选举两种情况。在讲解Leader选举前先了解一下Zookeeper节点4种可能状态和事务ID概念。
148 0
Zookeeper的Leader选举
|
大数据 开发者
ZooKeeper 集群选举:非全新集群选举|学习笔记
快速学习 ZooKeeper 集群选举:非全新集群选举
229 0
|
算法
ZK源码阅读系列-ZK集群Leader选举解析
ZK服务端启动代码涉及很广,本文就集群下的zookeeper是怎么选举leader的进析。
232 0
ZK源码阅读系列-ZK集群Leader选举解析
|
分布式计算 资源调度 Hadoop
十、Zookeeper (leader)选举机制
十、Zookeeper (leader)选举机制
十、Zookeeper (leader)选举机制
|
消息中间件 算法 网络协议
【分布式】Zookeeper的Leader选举
前面学习了Zookeeper服务端的相关细节,其中对于集群启动而言,很重要的一部分就是Leader选举,接着就开始深入学习Leader选举。
176 0
【分布式】Zookeeper的Leader选举