深入理解 ZK集群的Leader选举(一)

简介: 深入理解 ZK集群的Leader选举(一)

前言#


ZooKeeper对Zab协议的实现有自己的主备模型,即Leader和learner(Observer + Follower),有如下几种情况需要进行领导者的选举工作

  • 情形1: 集群在启动的过程中,需要选举Leader
  • 情形2: 集群正常启动后,leader因故障挂掉了,需要选举Leader
  • 情形3: 集群中的Follower数量不足以通过半数检验,Leader会挂掉自己,选举新leader
  • 情景4: 集群正常运行,新增加1个Follower

本篇博文,从这四个方面进行源码的追踪阅读


程序入口#


QuorumPeer.java相当于集群中的每一个节点server,在它的start()方法中,完成当前节点的启动工作,源码如下:


// todo 进入了 QuorumPeer(意为仲裁人数)类中,可以把这个类理解成集群中的某一个点
    @Override
    public synchronized void start() {
        // todo 从磁盘中加载数据到内存中
        loadDataBase();
        // todo 启动上下文的这个工厂,他是个线程类, 接受客户端的请求
        cnxnFactory.start();
        // todo 开启leader的选举工作
        startLeaderElection();
        // todo 确定服务器的角色, 启动的就是当前类的run方法在900行
        super.start();
    }


第一个loadDataBase();目的是将数据从集群中恢复到内存中

第二个cnxnFactory.start();是当前的节点可以接受来自客户端(java代码,或者控制台)发送过来的连接请求

第三个startLeaderElection();开启leader的选举工作, 但是其实他是初始化了一系列的辅助类,用来辅助leader的选举,并非真正在选举

当前类,quorumPeer继承了ZKThread,它本身就是一个线程类,super.start();就是启动它的run方法,在他的Run方法中有一个while循环,一开始在程序启动的阶段,所有的节点的默认值都是Looking,于是会进入这个分支中,在这个分之中会进行真正的leader选举工作


小结#


从程序的入口介绍中,可以看出本篇文章在会着重看下startLeaderElection();做了哪些工作? 以及在looking分支中如何选举leader


情形1:集群在启动的过程中,选举新Leader#


进入startLeaderElection();方法,源码如下, 他主要做了两件事

  • 对本类QuorumPeer.java维护的变量(volatile private Vote currentVote;)初始化
  • createElectionAlgorithm()创建一个leader选举的方法


其实到现在,就剩下一个算法没过期了,就是fastLeaderElection


// TODO 开启投票选举Leader的工作
    synchronized public void startLeaderElection() {
      try {
          // todo 创建了一个封装了投票结果对象   包含myid 最大的zxid 第几轮Leader
        // todo 先投票给自己
            // todo 跟进它的构造函数
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
      } catch(IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
      }
        for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
        }
        if (myQuorumAddr == null) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        // todo  创建一个领导者选举的算法,这个算法还剩下一个唯一的实现 快速选举
        this.electionAlg = createElectionAlgorithm(electionType);
    }


继续跟进createElectionAlgorithm(electionType), 在这个方法中做了如下三件大事

  • 创建了QuorumCnxnManager
  • 创建Listenner
  • 创建FastLeaderElection


protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        // todo 创建CnxnManager 上下文的管理器
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            // todo 在这里将listener 开启
            listener.start();
            // todo  实例化领导者选举的算法
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;


准备选举环境#


QuorumManager#



上图是QuorumCnxManager的类图,看一下,它有6个内部类, 其中的除了Message外其他都是可以单独运行的线程类


这个类有着举足轻重的作用,它是集群中全体节点共享辅助类, 那到底有什么作用呢? 我不卖关子直接说,因为leader的选举是通过投票决议出来的,既然要相互投票,那集群中的各个点就得两两之间建立连接,这个QuorumCnxManager就负责维护集群中的各个点的通信


它维护了两种队列,源码在下面,第一个队列被存入了ConcurrentHashMap中 key就是节点的myid(或者说是serverId),值可以理解成存储它往其他服务器发送投票的队列

第二个队列是收到的其他服务器发送过来的msg


// todo key=serverId(myid)   value = 保存着当前服务器向其他服务器发送消息的队列
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
// todo 接收到的所有数据都在这个队列中
public final ArrayBlockingQueue<Message> recvQueue;



如上图是手绘的QuorumCnxManager.java的体系图,最直观的可以看到它内部的三条线程类,那三条线程类的run()方法又分别做了什么呢?


SendWorker的run(), 可以看到它根据sid取出了当前节点对应的队列,然后将队列中的数据往外发送


public void run() {
            threadCnt.incrementAndGet();
            try {
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if (bq == null || isSendQueueEmpty(bq)) {
                   ByteBuffer b = lastMessageSent.get(sid);
                   if (b != null) {
                       LOG.debug("Attempting to send lastMessage to sid=" + sid);
                       send(b);
                   }
                }
            } catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }
            try {
                while (running && !shutdown && sock != null) {
                    ByteBuffer b = null;
                    try {
                        // todo 取出任务所在的队列
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                        if (bq != null) {
                            // todo 将bq,添加进sendQueue
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for " +
                                      "server " + sid);
                            break;
                        }
                        if(b != null){
                            lastMessageSent.put(sid, b);
                            // todo
                            send(b);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                                e);
                    }
                }


RecvWorker的run方法,接受到了msg,然后将msg存入了recvQueue队列中


public void run() {
            threadCnt.incrementAndGet();
            try {
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
                    int length = din.readInt();
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException(
                                "Received packet with invalid packet: "
                                        + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
                    // todo 从数组中把数据读取到数组中
                    byte[] msgArray = new byte[length];
                    din.readFully(msgArray, 0, length);
                    // todo 将数组包装成ByteBuf
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    // todo 添加到RecvQueue中
                    addToRecvQueue(new Message(message.duplicate(), sid));
                }


]


Listenner的run(),它会使用我们在配置文件中配置的集群键通信使用的端口(如上图的3888)建立彼此之间的连接

还能发现,集群中各个点之间的通信使用的传统socket通信


InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    // todo 创建serversocket
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {
                        int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort();
                        //todo 它取出来的地址就是address就是我们在配置文件中配置集群时添加进去的 port 3888...
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.toString());
                    // todo 绑定端口
                    ss.bind(addr);
                    while (!shutdown) {
                        // todo 阻塞接受其他的服务器发起连接
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());
                       // todo  如果启用了仲裁SASL身份验证,则异步接收和处理连接请求
                        // todo  这是必需的,因为sasl服务器身份验证过程可能需要几秒钟才能完成,这可能会延迟下一个对等连接请求。
                        if (quorumSaslAuthEnabled) {
                            // todo 异步接受一个连接
                            receiveConnectionAsync(client);
                        } else {
                            // todo 跟进这个方法
                            receiveConnection(client);
                        }
                        numRetries = 0;
                    }


继续跟进源码,回到QuorumPeer.javacreateElectionAlgorithm()方法中,重新截取源码如下,完成了QuorumCnxManager的创建,后进行Listener的启动, Listenner的启动标记着集群中的各个节点之间有了两两之间建立通信能力, 同时Listenner是个线程类,它的Run()方法就在上面的代码中


FastLeaderElection#


启动Listenner之后, 开始实例化领导者选举的算法对象new FastLeaderElection(this, qcm)


...
     break;
        case 3:
            // todo 创建CnxnManager 上下文的管理器
            qcm = createCnxnManager();
   QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                // todo 在这里将listener 开启
                listener.start();
                // todo  实例化领导者选举的算法
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }


如下图是FasterElection的类图



直观的看到它三个直接内部类

  • Messager(它又有两个内部线程类)
  • WorkerRecriver
  • 负责将
  • WorkerSender
  • Notification
  • 一般是当新节点启动时状态为looking,然后发起投票决议,其他节点收到后会用Notification告诉它自己信任的leader
  • ToSend
  • 给对方发送,或者来自其他节点的消息。这些消息既可以是通知,也可以是接收通知的ack


对应着QuorumCnxManager维护的两种队列,FasterElection同样维护下面两个队列与之照应,一个是sendqueue另一个是recvqueue


LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;


具体怎么玩呢? 如下图



就是当节点启动过程中对外的投票会存入FasterElectionsendqueue,然后经过QuorumCnxManagersendWorker通过NIO发送出去, 与之相反的过程,收到的其他节点的投票会被QuorumCnxManagerrecvWorker收到,然后存入QuorumCnxManagerrecvQueue中,这个队列中的msg会继续被FasterElection的内部线程类workerRecviver取出存放到FasterElectionrecvqueue中

通过追踪代码,可以发现,Message的两个内部线程都被作为守护线程的方式开启


Messenger(QuorumCnxManager manager) {
    // todo WorderSender 作为一条新的线程
    this.ws = new WorkerSender(manager);
    Thread t = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
    //todo------------------------------------
    // todo WorkerReceiver  作为一条新的线程
    this.wr = new WorkerReceiver(manager);
    t = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}


小结#


代码看到这里,其实选举leader的准备工作已经完成了,也就是说quorumPeer.javastart()方法中的startLeaderElection();已经准备领导选举的环境,就是上图

相关文章
|
3月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
Zookeeper Leader选举机制
Zookeeper Leader选举机制
87 0
|
API Apache
利用Zookeeper实现分布式应用的Leader选举
利用Zookeeper实现分布式应用的Leader选举
328 0
利用Zookeeper实现分布式应用的Leader选举
Zookeeper的Leader选举
Leader选举是保证分布式数据一致性的关键所在。Leader选举分为Zookeeper集群初始化启动时选举和Zookeeper集群运行期间Leader重新选举两种情况。在讲解Leader选举前先了解一下Zookeeper节点4种可能状态和事务ID概念。
143 0
Zookeeper的Leader选举
|
大数据 开发者
ZooKeeper 集群选举:非全新集群选举|学习笔记
快速学习 ZooKeeper 集群选举:非全新集群选举
224 0
|
算法
ZK源码阅读系列-ZK集群Leader选举解析
ZK服务端启动代码涉及很广,本文就集群下的zookeeper是怎么选举leader的进析。
220 0
ZK源码阅读系列-ZK集群Leader选举解析
|
分布式计算 资源调度 Hadoop
十、Zookeeper (leader)选举机制
十、Zookeeper (leader)选举机制
十、Zookeeper (leader)选举机制
|
消息中间件 算法 网络协议
【分布式】Zookeeper的Leader选举
前面学习了Zookeeper服务端的相关细节,其中对于集群启动而言,很重要的一部分就是Leader选举,接着就开始深入学习Leader选举。
167 0
【分布式】Zookeeper的Leader选举
|
算法 网络协议
【Zookeeper】源码分析之Leader选举(一)
分析完了Zookeeper中的网络机制后,接着来分析Zookeeper中一个更为核心的模块,Leader选举。
181 0
【Zookeeper】源码分析之Leader选举(一)