zookeeper的leader选举原理和底层源码实现超级详解 1

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: zookeeper的leader选举原理和底层源码实现超级详解

一,zookeeper选举原理

1,源码下载

在这个https://github.com/apache/zookeeper里面把源码下载即可,这里推荐版本为3.5.8

源码下载完成之后,在这个zookeeper-server的模块下面,在version包下面新建一个info的接口

其内容如下,如果会有编译报错就加入这个接口,没有的话也可以不加。

public interface Info {
     int MAJOR = 1;
     int MINOR = 0;
     int MICRO = 0;
     String QUALIFIER = null;
     int REVISION = -1 ;
     String REVISION_HASH = "1";
     String BUILD_DATE = "2022‐09‐03";
}

2,zookeeper集群选举流程

2.1,zookeeper集群启动以及配置加载

1,由于这个zookeeper的启动是通过一个./zkServer.sh的脚本实现整个服务的启动的,

因此可以发现脚本里面主要是通过这个QuorumPeerMain 类来作为一个集群的主启动类,接下来就是主要分析一下这个类。

ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"

2,在这个类里面,有一个main的入口方法

@InterfaceAudience.Public
public class  {
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try { 
            main.initializeAndRun(args);
        } catch(Execption e){
            ...
        }
    }
}

3,接下来就是一个重点,主要查看这个initializeAndRun的这个方法,这个方法主要会解析一下配置文件,清理一些文件

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    //解析配置文件,主要是解析这个zoo.cfg的配置文件
    if (args.length == 1) {
        config.parse(args[0]);
    }
    //定时清理一些文件目录
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
  //如果是分布式集群环境,那么走这个逻辑
    if (args.length == 1 && config.isDistributed()) {
        runFromConfig(config);
    }
    //如果是单机状态,那么走这个逻辑
    else {
        ZooKeeperServerMain.main(args);
    }
}

4,那么可以查看这个分布式环境下的方法所走的流程,主要是查看这个runFromConfig方法,主要是会创建一个nio实例或者创建一个netty的一个工厂,将一些解析的文件存储到一个QuorumPeer的对象里面

public void runFromConfig(QuorumPeerConfig config) throws IOException,AdminServerException{
  try {
    ServerCnxnFactory cnxnFactory = null;
    ServerCnxnFactory secureCnxnFactory = null;
    if (config.getClientPortAddress() != null) {
            //创建nio或者netty工厂
        cnxnFactory = ServerCnxnFactory.createFactory();
            //监听客户端的端口
        cnxnFactory.configure(config.getClientPortAddress(),
                config.getMaxClientCnxns(),
                false);
    }
        quorumPeer = getQuorumPeer();
        //将解析出来的配置文件的值存到这个quorumPeer对象里面
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        //属性填充之后就进行一个初始化
        quorumPeer.initialize();
        //开启这个zookeeper的集群
        quorumPeer.start();
        quorumPeer.join();
    }
}

5,接下来主要查看这个start启动方法,会将已有的数据从磁盘加载到内存里面

@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
     }
    //加载zookeeper里面已有的数据文件
  //比如一些快照文件,需要从磁盘加载到内存里面
    loadDataBase();
    //启动刚刚初始化的这个CNX的工厂
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    //和选举有关的算法
    startLeaderElection();
    //选举的具体实现
    super.start();
}

6,结点的几个状态,当集群没有选举出来这个leader的时候,这个结点处于一个LOOKING状态;当主结点选举出来之后,这个主结点是LEADING状态,这个从结点是FOLLOWING状态;

public enum ServerState {
    LOOKING, FOLLOWING, LEADING, OBSERVING;
}

2.2,leader选举工作准备开始

7,接下来就是一个重点和这个选举主结点有关的方法startLeaderElection

synchronized public void startLeaderElection() {
    try {
        //观望状态
        if (getPeerState() == ServerState.LOOKING) {
            //初始化一个投票对象,先给自己投一票
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch(IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    if (electionType == 0) {
        try {
            udpSocket = new DatagramSocket(getQuorumAddress().getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    //快速选举的一个算法
    this.electionAlg = createElectionAlgorithm(electionType);
}

8,这个选举的算法如下,这个默认传进来的参数为3。因此这个默认选用的就是这个FastLeaderElection的这个算法。主要是会初始化一个选举数据的一个管理器,然后会通过一个bio的方式对这个选举进行一个监听的操作,最后会有一个选举的具体的一个算法

protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        //初始化一个QuorumCnxManager类
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            oldQcm.halt();
        }
        //创建一个线程
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            //开启线程,底层会去绑定一个端口,通过这个bio的方式进行一个选票的逻辑
            listener.start();
            //选举的逻辑
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

9,这个算法的逻辑方法FastLeaderElection的如下,主要就是在里面创建了几个链表类型的阻塞队列。阻塞队列就是为了后面加这个消息存放在这几个阻塞队列里面。

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;
    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

10,接下来会有一个fle.start();的这个方法,里面的主要方法实现如下

void start(){
    this.wsThread.start();
    this.wrThread.start();
}
Messenger(QuorumCnxManager manager) {
    //发起选票的线程
    this.ws = new WorkerSender(manager);
    this.wsThread = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);
    //接收选票的线程
    this.wr = new WorkerReceiver(manager);
    this.wrThread = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);
}

11,接下来回到第五步里面的这个 super.start() 的这个方法,其run方法的主要的核心如下,就是一个具体的投票的一个业务逻辑。主要会判断当前机器的一个状态,是looking状态还是已经选举成功的状态

try {
    while (running) {
        switch (getPeerState()) {
        //默认是looking观望状态,还在找leader
        case LOOKING:
            LOG.info("LOOKING");
            if (Boolean.getBoolean("readonlymode.enabled")) {
                LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                final ReadOnlyZooKeeperServer roZk =
                    new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
                Thread roZkMgr = new Thread() {
                    public void run() {
                        try {
                            sleep(Math.max(2000, tickTime));
                            if (ServerState.LOOKING.equals(getPeerState())) {
                                roZk.startup();
                            }
                        } catch (InterruptedException e) {
                        } catch (Exception e) {
                        }
                    }
                };
                try {
                    roZkMgr.start();
                    reconfigFlagClear();
                    if (shuttingDownLE) {
                        shuttingDownLE = false;
                        startLeaderElection();
                    }
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                    setPeerState(ServerState.LOOKING);
                } finally {
                    roZkMgr.interrupt();
                    roZk.shutdown();
                }
            } else {
                try {
                   reconfigFlagClear();
                    if (shuttingDownLE) {
                       shuttingDownLE = false;
                       startLeaderElection();
                       }
                    //设置一个当前的投票,就是给自己投一票
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                    setPeerState(ServerState.LOOKING);
                }                        
            }
            break;
        case OBSERVING:
            try {
                LOG.info("OBSERVING");
                setObserver(makeObserver(logFactory));
                observer.observeLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e );
            } finally {
                observer.shutdown();
                setObserver(null);  
               updateServerState();
            }
            break;
        case FOLLOWING:
            try {
               LOG.info("FOLLOWING");
                setFollower(makeFollower(logFactory));
                follower.followLeader();
            } catch (Exception e) {
               LOG.warn("Unexpected exception",e);
            } finally {
               follower.shutdown();
               setFollower(null);
               updateServerState();
            }
            break;
        case LEADING:
            LOG.info("LEADING");
            try {
                setLeader(makeLeader(logFactory));
                leader.lead();
                setLeader(null);
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                updateServerState();
            }
            break;
        }
        start_fle = Time.currentElapsedTime();
    }
}

12,这个设置投票的方法如下,主要是在这个lookForLeader里面,从这一步开始,就是选票的正式开始

public Vote lookForLeader() throws InterruptedException {
  try {
        //创建两个集合
    HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    int notTimeout = finalizeWait;
        //构建一个同步块
    synchronized(this){
        logicalclock.incrementAndGet();
            //更新选票信息
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }
        //发送选票通知
        sendNotifications();
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
            //从队列中获取消息,接收这个选票的信息
            Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);
            //初始为空
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    //和其他端口建立这个bio的Socket连接
                    //每个结点在开启的时候都会开启一个socket连接
                    manager.connectAll();
                }
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
            }
            //如果收到了选票,判断这个选票是否合理
          else if (validVoter(n.sid) && validVoter(n.leader)) {
          }
        }
  }
}

初始这个选票的个数为空,然后会发送一个选票通知,具体实现如下

private void sendNotifications() {
    //获取全部可以发送消息的机器
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        //构造一条消息,向目标机器发送
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch, qv.toString().getBytes());
        if(LOG.isDebugEnabled()){
        }
        //将消息加入到这个内存队列里面
        sendqueue.offer(notmsg);
    }
}

13,在这个客户端之间建立这个socket连接时,主要会通过下面这个方式实现。就是一般大的机器的sid会去连接小的sid,避免重复的双向连接。只要有一方连接,就可以实现双方的通信

private void handleConnection(Socket sock, DataInputStream din){
    try {
      protocolVersion = din.readLong();
      if (protocolVersion >= 0) { // this is a server id and not a protocol version
          sid = protocolVersion;
      } 
        else {
          try {
                //获取远端服务器连接信息
              InitialMessage init = InitialMessage.parse(protocolVersion, din);
                //获取远端服务器的sid
              sid = init.sid;
              electionAddr = init.electionAddr;
          } catch (InitialMessage.InitialMessageException ex) {
              LOG.error("Initial message parsing error!", ex);
              closeSocket(sock);
              return;
          }
        }
    }
    //如果请求的机器的sid小,那么会直接关闭这个连接
    if (sid < self.getId()) {
        closeSocket(sock);
    }
    else {
        //发送数据线程
        SendWorker sw = new SendWorker(sock, sid);
        //接收数据线程
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        //来一个连接,就会将这个连接存放在一个map里面去
        senderWorkerMap.put(sid, sw);
        //最后将这个选票的线程存放队列
        queueSendMap.putIfAbsent(sid,
                new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
        sw.start();
        rw.start();
    }
}

8c45d445e473421f9b1dde9292cb5f20.png

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
分布式计算 负载均衡 算法
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
33 1
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
45 1
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
52 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
52 0
|
3月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
5月前
|
存储 数据库
zookeeper 集群环境搭建及集群选举及数据同步机制
zookeeper 集群环境搭建及集群选举及数据同步机制
99 2
|
7月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
352 0
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
3月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
下一篇
DataWorks