一,zookeeper选举原理
1,源码下载
在这个https://github.com/apache/zookeeper里面把源码下载即可,这里推荐版本为3.5.8
源码下载完成之后,在这个zookeeper-server的模块下面,在version包下面新建一个info的接口
其内容如下,如果会有编译报错就加入这个接口,没有的话也可以不加。
public interface Info { int MAJOR = 1; int MINOR = 0; int MICRO = 0; String QUALIFIER = null; int REVISION = -1 ; String REVISION_HASH = "1"; String BUILD_DATE = "2022‐09‐03"; }
2,zookeeper集群选举流程
2.1,zookeeper集群启动以及配置加载
1,由于这个zookeeper的启动是通过一个./zkServer.sh的脚本实现整个服务的启动的,
因此可以发现脚本里面主要是通过这个QuorumPeerMain 类来作为一个集群的主启动类,接下来就是主要分析一下这个类。
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
2,在这个类里面,有一个main的入口方法
@InterfaceAudience.Public public class { public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch(Execption e){ ... } } }
3,接下来就是一个重点,主要查看这个initializeAndRun的这个方法,这个方法主要会解析一下配置文件,清理一些文件
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); //解析配置文件,主要是解析这个zoo.cfg的配置文件 if (args.length == 1) { config.parse(args[0]); } //定时清理一些文件目录 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //如果是分布式集群环境,那么走这个逻辑 if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } //如果是单机状态,那么走这个逻辑 else { ZooKeeperServerMain.main(args); } }
4,那么可以查看这个分布式环境下的方法所走的流程,主要是查看这个runFromConfig方法,主要是会创建一个nio实例或者创建一个netty的一个工厂,将一些解析的文件存储到一个QuorumPeer的对象里面
public void runFromConfig(QuorumPeerConfig config) throws IOException,AdminServerException{ try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { //创建nio或者netty工厂 cnxnFactory = ServerCnxnFactory.createFactory(); //监听客户端的端口 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } quorumPeer = getQuorumPeer(); //将解析出来的配置文件的值存到这个quorumPeer对象里面 quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); //属性填充之后就进行一个初始化 quorumPeer.initialize(); //开启这个zookeeper的集群 quorumPeer.start(); quorumPeer.join(); } }
5,接下来主要查看这个start启动方法,会将已有的数据从磁盘加载到内存里面
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } //加载zookeeper里面已有的数据文件 //比如一些快照文件,需要从磁盘加载到内存里面 loadDataBase(); //启动刚刚初始化的这个CNX的工厂 startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } //和选举有关的算法 startLeaderElection(); //选举的具体实现 super.start(); }
6,结点的几个状态,当集群没有选举出来这个leader的时候,这个结点处于一个LOOKING状态;当主结点选举出来之后,这个主结点是LEADING状态,这个从结点是FOLLOWING状态;
public enum ServerState { LOOKING, FOLLOWING, LEADING, OBSERVING; }
2.2,leader选举工作准备开始
7,接下来就是一个重点和这个选举主结点有关的方法startLeaderElection
synchronized public void startLeaderElection() { try { //观望状态 if (getPeerState() == ServerState.LOOKING) { //初始化一个投票对象,先给自己投一票 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } if (electionType == 0) { try { udpSocket = new DatagramSocket(getQuorumAddress().getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } //快速选举的一个算法 this.electionAlg = createElectionAlgorithm(electionType); }
8,这个选举的算法如下,这个默认传进来的参数为3。因此这个默认选用的就是这个FastLeaderElection的这个算法。主要是会初始化一个选举数据的一个管理器,然后会通过一个bio的方式对这个选举进行一个监听的操作,最后会有一个选举的具体的一个算法
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: //初始化一个QuorumCnxManager类 QuorumCnxManager qcm = createCnxnManager(); QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); if (oldQcm != null) { oldQcm.halt(); } //创建一个线程 QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ //开启线程,底层会去绑定一个端口,通过这个bio的方式进行一个选票的逻辑 listener.start(); //选举的逻辑 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
9,这个算法的逻辑方法FastLeaderElection的如下,主要就是在里面创建了几个链表类型的阻塞队列。阻塞队列就是为了后面加这个消息存放在这几个阻塞队列里面。
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false; this.manager = manager; starter(self, manager); } private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); }
10,接下来会有一个fle.start();的这个方法,里面的主要方法实现如下
void start(){ this.wsThread.start(); this.wrThread.start(); } Messenger(QuorumCnxManager manager) { //发起选票的线程 this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); //接收选票的线程 this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true); }
11,接下来回到第五步里面的这个 super.start() 的这个方法,其run方法的主要的核心如下,就是一个具体的投票的一个业务逻辑。主要会判断当前机器的一个状态,是looking状态还是已经选举成功的状态
try { while (running) { switch (getPeerState()) { //默认是looking观望状态,还在找leader case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); Thread roZkMgr = new Thread() { public void run() { try { sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { } catch (Exception e) { } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } //设置一个当前的投票,就是给自己投一票 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); updateServerState(); } break; case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } start_fle = Time.currentElapsedTime(); } }
12,这个设置投票的方法如下,主要是在这个lookForLeader里面,从这一步开始,就是选票的正式开始
public Vote lookForLeader() throws InterruptedException { try { //创建两个集合 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; //构建一个同步块 synchronized(this){ logicalclock.incrementAndGet(); //更新选票信息 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //发送选票通知 sendNotifications(); while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ //从队列中获取消息,接收这个选票的信息 Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); //初始为空 if(n == null){ if(manager.haveDelivered()){ sendNotifications(); } else { //和其他端口建立这个bio的Socket连接 //每个结点在开启的时候都会开启一个socket连接 manager.connectAll(); } int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); } //如果收到了选票,判断这个选票是否合理 else if (validVoter(n.sid) && validVoter(n.leader)) { } } } }
初始这个选票的个数为空,然后会发送一个选票通知,具体实现如下
private void sendNotifications() { //获取全部可以发送消息的机器 for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); //构造一条消息,向目标机器发送 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ } //将消息加入到这个内存队列里面 sendqueue.offer(notmsg); } }
13,在这个客户端之间建立这个socket连接时,主要会通过下面这个方式实现。就是一般大的机器的sid会去连接小的sid,避免重复的双向连接。只要有一方连接,就可以实现双方的通信
private void handleConnection(Socket sock, DataInputStream din){ try { protocolVersion = din.readLong(); if (protocolVersion >= 0) { // this is a server id and not a protocol version sid = protocolVersion; } else { try { //获取远端服务器连接信息 InitialMessage init = InitialMessage.parse(protocolVersion, din); //获取远端服务器的sid sid = init.sid; electionAddr = init.electionAddr; } catch (InitialMessage.InitialMessageException ex) { LOG.error("Initial message parsing error!", ex); closeSocket(sock); return; } } } //如果请求的机器的sid小,那么会直接关闭这个连接 if (sid < self.getId()) { closeSocket(sock); } else { //发送数据线程 SendWorker sw = new SendWorker(sock, sid); //接收数据线程 RecvWorker rw = new RecvWorker(sock, din, sid, sw); //来一个连接,就会将这个连接存放在一个map里面去 senderWorkerMap.put(sid, sw); //最后将这个选票的线程存放队列 queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); sw.start(); rw.start(); } }