前言#
ZooKeeper对Zab协议的实现有自己的主备模型,即Leader和learner(Observer + Follower),有如下几种情况需要进行领导者的选举工作
- 情形1: 集群在启动的过程中,需要选举Leader
- 情形2: 集群正常启动后,leader因故障挂掉了,需要选举Leader
- 情形3: 集群中的Follower数量不足以通过半数检验,Leader会挂掉自己,选举新leader
- 情景4: 集群正常运行,新增加1个Follower
本篇博文,从这四个方面进行源码的追踪阅读
程序入口#
QuorumPeer.java
相当于集群中的每一个节点server,在它的start()
方法中,完成当前节点的启动工作,源码如下:
// todo 进入了 QuorumPeer(意为仲裁人数)类中,可以把这个类理解成集群中的某一个点 @Override public synchronized void start() { // todo 从磁盘中加载数据到内存中 loadDataBase(); // todo 启动上下文的这个工厂,他是个线程类, 接受客户端的请求 cnxnFactory.start(); // todo 开启leader的选举工作 startLeaderElection(); // todo 确定服务器的角色, 启动的就是当前类的run方法在900行 super.start(); }
第一个loadDataBase();
目的是将数据从集群中恢复到内存中
第二个cnxnFactory.start();
是当前的节点可以接受来自客户端(java代码,或者控制台)发送过来的连接请求
第三个startLeaderElection();
开启leader的选举工作, 但是其实他是初始化了一系列的辅助类,用来辅助leader的选举,并非真正在选举
当前类,quorumPeer
继承了ZKThread,它本身就是一个线程类,super.start();
就是启动它的run方法,在他的Run方法中有一个while循环,一开始在程序启动的阶段,所有的节点的默认值都是Looking
,于是会进入这个分支中,在这个分之中会进行真正的leader选举工作
小结#
从程序的入口介绍中,可以看出本篇文章在会着重看下startLeaderElection();
做了哪些工作? 以及在looking
分支中如何选举leader
情形1:集群在启动的过程中,选举新Leader#
进入startLeaderElection();
方法,源码如下, 他主要做了两件事
- 对本类
QuorumPeer.java
维护的变量(volatile private Vote currentVote;
)初始化 createElectionAlgorithm()
创建一个leader选举的方法
其实到现在,就剩下一个算法没过期了,就是
fastLeaderElection
// TODO 开启投票选举Leader的工作 synchronized public void startLeaderElection() { try { // todo 创建了一个封装了投票结果对象 包含myid 最大的zxid 第几轮Leader // todo 先投票给自己 // todo 跟进它的构造函数 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } // todo 创建一个领导者选举的算法,这个算法还剩下一个唯一的实现 快速选举 this.electionAlg = createElectionAlgorithm(electionType); }
继续跟进createElectionAlgorithm(electionType)
, 在这个方法中做了如下三件大事
- 创建了
QuorumCnxnManager
- 创建
Listenner
- 创建
FastLeaderElection
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: // todo 创建CnxnManager 上下文的管理器 qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ // todo 在这里将listener 开启 listener.start(); // todo 实例化领导者选举的算法 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break;
准备选举环境#
QuorumManager#
上图是QuorumCnxManager的类图,看一下,它有6个内部类, 其中的除了Message
外其他都是可以单独运行的线程类
这个类有着举足轻重的作用,它是集群中全体节点共享辅助类, 那到底有什么作用呢? 我不卖关子直接说,因为leader的选举是通过投票决议出来的,既然要相互投票,那集群中的各个点就得两两之间建立连接,这个QuorumCnxManager
就负责维护集群中的各个点的通信
它维护了两种队列,源码在下面,第一个队列被存入了ConcurrentHashMap
中 key就是节点的myid(或者说是serverId),值可以理解成存储它往其他服务器发送投票的队列
第二个队列是收到的其他服务器发送过来的msg
// todo key=serverId(myid) value = 保存着当前服务器向其他服务器发送消息的队列 final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; // todo 接收到的所有数据都在这个队列中 public final ArrayBlockingQueue<Message> recvQueue;
如上图是手绘的QuorumCnxManager.java
的体系图,最直观的可以看到它内部的三条线程类,那三条线程类的run()方法又分别做了什么呢?
SendWorker的run(), 可以看到它根据sid取出了当前节点对应的队列,然后将队列中的数据往外发送
public void run() { threadCnt.incrementAndGet(); try { ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if (bq == null || isSendQueueEmpty(bq)) { ByteBuffer b = lastMessageSent.get(sid); if (b != null) { LOG.debug("Attempting to send lastMessage to sid=" + sid); send(b); } } } catch (IOException e) { LOG.error("Failed to send last message. Shutting down thread.", e); this.finish(); } try { while (running && !shutdown && sock != null) { ByteBuffer b = null; try { // todo 取出任务所在的队列 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if (bq != null) { // todo 将bq,添加进sendQueue b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); } else { LOG.error("No queue of incoming messages for " + "server " + sid); break; } if(b != null){ lastMessageSent.put(sid, b); // todo send(b); } } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for message on queue", e); } }
RecvWorker的run方法,接受到了msg,然后将msg存入了recvQueue
队列中
public void run() { threadCnt.incrementAndGet(); try { while (running && !shutdown && sock != null) { /** * Reads the first int to determine the length of the * message */ int length = din.readInt(); if (length <= 0 || length > PACKETMAXSIZE) { throw new IOException( "Received packet with invalid packet: " + length); } /** * Allocates a new ByteBuffer to receive the message */ // todo 从数组中把数据读取到数组中 byte[] msgArray = new byte[length]; din.readFully(msgArray, 0, length); // todo 将数组包装成ByteBuf ByteBuffer message = ByteBuffer.wrap(msgArray); // todo 添加到RecvQueue中 addToRecvQueue(new Message(message.duplicate(), sid)); }
]
Listenner的run(),它会使用我们在配置文件中配置的集群键通信使用的端口(如上图的3888)建立彼此之间的连接
还能发现,集群中各个点之间的通信使用的传统socket通信
InetSocketAddress addr; while((!shutdown) && (numRetries < 3)){ try { // todo 创建serversocket ss = new ServerSocket(); ss.setReuseAddress(true); if (listenOnAllIPs) { int port = view.get(QuorumCnxManager.this.mySid) .electionAddr.getPort(); //todo 它取出来的地址就是address就是我们在配置文件中配置集群时添加进去的 port 3888... addr = new InetSocketAddress(port); } else { addr = view.get(QuorumCnxManager.this.mySid) .electionAddr; } LOG.info("My election bind port: " + addr.toString()); setName(view.get(QuorumCnxManager.this.mySid) .electionAddr.toString()); // todo 绑定端口 ss.bind(addr); while (!shutdown) { // todo 阻塞接受其他的服务器发起连接 Socket client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); // todo 如果启用了仲裁SASL身份验证,则异步接收和处理连接请求 // todo 这是必需的,因为sasl服务器身份验证过程可能需要几秒钟才能完成,这可能会延迟下一个对等连接请求。 if (quorumSaslAuthEnabled) { // todo 异步接受一个连接 receiveConnectionAsync(client); } else { // todo 跟进这个方法 receiveConnection(client); } numRetries = 0; }
继续跟进源码,回到QuorumPeer.java
的createElectionAlgorithm()
方法中,重新截取源码如下,完成了QuorumCnxManager
的创建,后进行Listener的启动, Listenner的启动标记着集群中的各个节点之间有了两两之间建立通信能力, 同时Listenner是个线程类,它的Run()方法就在上面的代码中
FastLeaderElection#
启动Listenner之后, 开始实例化领导者选举的算法对象new FastLeaderElection(this, qcm)
... break; case 3: // todo 创建CnxnManager 上下文的管理器 qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ // todo 在这里将listener 开启 listener.start(); // todo 实例化领导者选举的算法 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); }
如下图是FasterElection
的类图
直观的看到它三个直接内部类
- Messager(它又有两个内部线程类)
- WorkerRecriver
- 负责将
- WorkerSender
- Notification
- 一般是当新节点启动时状态为looking,然后发起投票决议,其他节点收到后会用
Notification
告诉它自己信任的leader
- ToSend
- 给对方发送,或者来自其他节点的消息。这些消息既可以是通知,也可以是接收通知的ack
对应着QuorumCnxManager
维护的两种队列,FasterElection
同样维护下面两个队列与之照应,一个是sendqueue
另一个是recvqueue
LinkedBlockingQueue<ToSend> sendqueue; LinkedBlockingQueue<Notification> recvqueue;
具体怎么玩呢? 如下图
就是当节点启动过程中对外的投票会存入FasterElection
的sendqueue
,然后经过QuorumCnxManager
的sendWorker
通过NIO发送出去, 与之相反的过程,收到的其他节点的投票会被QuorumCnxManager
的recvWorker
收到,然后存入QuorumCnxManager
的recvQueue中
,这个队列中的msg会继续被FasterElection
的内部线程类workerRecviver
取出存放到FasterElection
的recvqueue中
通过追踪代码,可以发现,Message的两个内部线程都被作为守护线程的方式开启
Messenger(QuorumCnxManager manager) { // todo WorderSender 作为一条新的线程 this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); //todo------------------------------------ // todo WorkerReceiver 作为一条新的线程 this.wr = new WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); }
小结#
代码看到这里,其实选举leader的准备工作已经完成了,也就是说quorumPeer.java
的start()
方法中的startLeaderElection();
已经准备领导选举的环境,就是上图