ZooKeeper源码研究系列(4)集群版服务器介绍

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

1 系列目录

2 集群版服务器启动过程

启动类是org.apache.zookeeper.server.quorum.QuorumPeerMain,启动参数就是配置文件的地址

2.1 配置文件说明

来看下一个简单的配置文件内容:

tickTime=4000
initLimit=10
syncLimit=5
dataDir=D:\\zk-test\\datadir\\server1
clientPort=2181
maxClientCnxns=60

server.1=localhost:2881:3881
server.2=localhost:2882:3882
server.3=localhost:2883:3883
  • tickTime值,单位ms,默认3000

    • 用途1:用于指定session检查的间隔

      服务器会每隔一段时间检查一次连接它的客户端的session是否过期。该间隔就是tickTime。

    • 用途2:用于给出默认的minSessionTimeout和maxSessionTimeout

      如果没有给出maxSessionTimeout和minSessionTimeout(为-1),则minSessionTimeout和maxSessionTimeout的取值如下:

      minSessionTimeout == -1 ? tickTime 2 : minSessionTimeout; maxSessionTimeout == -1 ? tickTime 20 : maxSessionTimeout;

      分别是tickTime的2倍和20倍。

      客户端代码在创建ZooKeeper对象的时候会给出一个sessionTimeout时间,而上述的minSessionTimeout和maxSessionTimeout就是用来约束客户端的sessionTimeout

    • 用途3:作为initLimit和syncLimit时间的基数,见下面

  • initLimit:在初始化阶段和Leader的通信的读取超时时间,即当调用socket的InputStream的read方法时最大阻塞时间不能超过initLimit*tickTime。设置如下:

    initLimit读取超时时间

    initLimit还会作为初始化阶段收集相关响应的总时间,一旦超过该时间,还没有过半的机器进行响应,则抛出InterruptedException的timeout异常

  • syncLimit:在初始化阶段之后的请求阶段和Leader通信的读取超时时间,即对Leader的一次请求到响应的总时间不能超过syncLimit*tickTime时间。Follower和Leader之间的socket的超时时间初始化阶段是前者,当初始化完毕又设置到后者时间上。设置如下:

    syncLimit读取超时时间

    syncLimit还会作为与Leader的连接超时时间,如下:

    与Leader的连接超时时间

  • dataDir:用于存储数据快照的目录

  • dataLogDir:用于存储事务日志的目录,如果没有指定,则和dataDir保持一致

  • clientPort:对客户端暴漏的连接端口

  • maxClientCnxns值,用于指定服务器端最大的连接数。

  • 集群的server配置,一种格式为server.A=B:C:D,还有其他格式,具体可以去看QuorumPeerConfig源码解析这一块

    A:即为集群中server的id标示,很多地方用到它,如选举过程中,就是通过id来识别是哪台服务器的投票。如初始化sessionId的时候,也用到id来防止sessionId出现重复。

    B:即该服务器的host地址或者ip地址。

    C:一旦Leader选举成功后,非Leader服务器都要和Leader服务器建立tcp连接进行通信。该通信端口即为C

    D:在Leader选举阶段,每个服务器之间相互连接(上述serverId大的会主动连接serverId小的server),进行投票选举的事宜,D即为投票选举时的通信端口

    上述配置是每台服务器都要知道的集群配置,同时要求在dataDir目录中创建一个myid文件,里面写上上述serverid中的一个id值,即表明某台服务器所属的编号

2.2 集群版服务器启动概述

我们由前一篇文章知道了,单机版的服务器启动,就是创建了一个ZooKeeperServer对象。我们需要再次熟悉下ZooKeeperServer的类图,如下:

输入图片说明

可见Leader服务器要使用LeaderZooKeeperServer,Follower服务器要使用FollowerZooKeeperServer。而集群版服务器启动后,可能是Leader或者Follower。在运行过程中角色还会进行自动更换,即自动更换使用不同的ZooKeeperServer子类。此时就需要一个代理对象,用于角色的更换、所使用的ZooKeeperServer的子类的更换。这就是QuorumPeer,如下图

输入图片说明

这里面很多的配置属性都交给了QuorumPeer,由它传递给底层所使用的ZooKeeperServer子类。

来详细看看这些配置属性:

  • ServerCnxnFactory cnxnFactory:负责和客户端建立连接和通信

  • FileTxnSnapLog logFactory:通过dataDir和dataLogDir目录,用于事务日志记录和内存DataTree和session数据的快照。

  • Map<Long, QuorumServer> quorumPeers:QuorumServer包含ip、和Leader通信端口、选举端口即上述server.A=B:C:D的内容。而这里的key则是A,即server的id。这里的server不包含observers,即这里的server都是要参与投票的。

  • int electionType:选举算法的类型。默认是3,采用的是FastLeaderElection选举算法。如下图

    创建选举算法的过程

    目前前三种选举算法都被标记为过时了,只保留了最后一种选举算法。具体的选举过程,后面单独拿出一篇博客来分析。目前的首要目标是把集群的启动过程简单弄清楚,然后理解在集群时,如何来处理请求的整个过程。

  • long myid:就是本机器配置的id,即myid文件中写入的数字。

  • int tickTime、minSessionTimeout、maxSessionTimeout:这几个参数在单机版的时候都讲过了。

  • int initLimit、syncLimit:上面已经详细描述过了

  • QuorumVerifier quorumConfig:用于验证是否过半机器已经认同了。默认采用的是QuorumMaj,即最简单的数量过半即可,不考虑权重问题

  • ZKDatabase zkDb:即该服务器的内存数据库,最终还是会传递给ZooKeeperServer的子类。

  • LearnerType:就两种,PARTICIPANT, OBSERVER。PARTICIPANT参与投票,可能成为Follower,也可能成为Leader。OBSERVER不参与投票,角色不会改变。

然后就是启动QuorumPeer,之后阻塞主线程,启动过程如下:

QuorumPeer启动过程

主要分成4大步:

  • loadDataBase():从事务日志目录dataLogDir和数据快照目录dataDir中恢复出DataTree数据

  • cnxnFactory.start():开启对客户端的连接端口

  • startLeaderElection():创建出选举算法

  • super.start():启动QuorumPeer线程,在该线程中进行服务器状态的检查

不再重点说明选举过程,后面会专门抽出一篇博客来详细说明选举过程。

QuorumPeer本身继承了Thread,在run方法中不断的检测当前服务器的状态,即QuorumPeer的ServerState state属性。ServerState枚举内容如下:

public enum ServerState {
    LOOKING, FOLLOWING, LEADING, OBSERVING;
}
  • LOOKING:即该服务器处于Leader选举阶段

  • FOLLOWING:即该服务器作为一个Follower

  • LEADING:即该服务器作为一个Leader

  • OBSERVING:即该服务器作为一个Observer

在QuorumPeer的线程中操作如下:

  • 服务器的状态是LOOKING,则根据之前创建的选举算法,执行选举过程

    选举过程一直阻塞,直到完成选举。完成选举后,各自的服务器根据投票结果判定自己是不是被选举成Leader了,如果不是则状态改变为FOLLOWING,如果是Leader,则状态改变为LEADING。

  • 服务器的状态是LEADING:则会创建出LeaderZooKeeperServer服务器,然后封装成Leader,调用Leader的lead()方法,也是阻塞方法,只有当该Leader挂了之后,才去执行下setLeader(null)并重新回到LOOKING的状态

    LEADING状态的操作

  • 服务器的状态是FOLLOWING:则会创建出FollowerZooKeeperServer服务器,然后封装成Follower,调用follower的followLeader()方法,也是阻塞方法,只有当该集群中的Leader挂了之后,才去执行下setFollower(null)并重新回到LOOKING的状态

    FOLLOWING状态的操作

下面就来详细的看看各个角色的启动过程:

2.3 Leader和Follower启动过程

首先是根据已有的配置信息创建出LeaderZooKeeperServer:

LeaderZooKeeperServer的创建

然后就是封装成Leader对象

封装成Leader对象

Leader和LeaderZooKeeperServer各自的职责是什么呢?

我们知道单机版使用的ZooKeeperServer不需要处理集群版中Follower与Leader之间的通信。ZooKeeperServer最主要的就是RequestProcessor处理器链、ZKDatabase、SessionTracker(只是实现不一样)。这几部分是单机版和集群版服务器都共通的,主要不同的地方就是RequestProcessor处理器链的不同。所以LeaderZooKeeperServer、FollowerZooKeeperServer和ZooKeeperServer最主要的区别就是RequestProcessor处理器链。

集群版还要负责处理Follower与Leader之间的通信,所以需要在LeaderZooKeeperServer和FollowerZooKeeperServer之外加入这部分内容。所以就有了Leader对LeaderZooKeeperServer等封装,Follower对FollowerZooKeeperServer的封装。前者加上加入ServerSocket负责等待Follower的socket连接,后者加入Socket负责去连接Leader。

看下Leader处理socket连接的过程:

Leader处理socket连接的过程

可以看到每来一个其他ZooKeeper服务器的socket连接,就会创建一个LearnerHandler,具体的处理逻辑就全部交给LearnerHandler了

然后在LearnerHandler中就开始了Leader和Follower或者Observer的初始化同步过程,这个过程之后详细讲解。完成同步之后,LearnerHandler就进行循环过程,不断的读取来自Follower或者Observer的数据包,如下:

while (true) {
    qp = new QuorumPacket();
    ia.readRecord(qp, "packet");

    switch (qp.getType()) {
    case Leader.ACK:          
        break;
    case Leader.PING:          
        break;
    case Leader.REVALIDATE:        
        break;
    case Leader.REQUEST:                          
        break;
    default:
        LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
        break;
    }
}

LearnerHandler会接收来自Follower或者Observer的PING、Request请求等。PING请求,则需要重新计算所传递过来的sessionId的过期时间。事务请求则需要Follower或者Observer转发给Leader,该事务请求就是Leader.REQUEST类型。

同时Follower也在不断接收来自Leader的数据包,处理如下:

Follower处理与Leader的通信

Leader在开启与Follower或者Observer同步的时候,同时在启动了本身的RequestProcessor处理器链,如下:

Leader的RequestProcessor处理器链

PrepRequestProcessor-》ProposalRequestProcessor-》CommitProcessor-》ToBeAppliedRequestProcessor-》FinalRequestProcessor

ProposalRequestProcessor-》SyncRequestProcessor-》AckRequestProcessor

再来看看Follower的处理器链

Follower的RequestProcessor处理器链

FollowerRequestProcessor-》CommitProcessor-》FinalRequestProcessor

SyncRequestProcessor-》SendAckRequestProcessor

接下来就是需要详细的看看这些处理器链

2.4 Leader和Follower的RequestProcessor处理器链

2.4.1 Follower的FollowerRequestProcessor处理器

先来看下具体的处理过程:

FollowerRequestProcessor处理过程

对于一个请求,先交给下一个处理器来处理,如果请求是事务请求,还要将该请求转发给Leader。zks.getFollower().request(request)即通过上述Leader与Follower的tcp连接发送给Leader,最终会在上述LearnerHandler中出现。

由于FollowerRequestProcessor的下一个处理器是CommitProcessor(是一个线程),nextProcessor.processRequest(request)这个操作仅仅是把request放入等待处理的队列中,然后就返回了,执行下面的代码,将事务请求转发给Leader。

2.4.2 Follower的CommitProcessor处理器

FollowerRequestProcessor把请求交给了CommitProcessor,看下CommitProcessor的整个处理流程

CommitProcessor的属性

CommitProcessor有三个重要属性:

  • LinkedList<Request> queuedRequests:用于存放FollowerRequestProcessor提交的请求

  • LinkedList<Request> committedRequests:用于存放Leader对该Follower下达的commit请求。我们知道一旦是事务请求就会转发给Leader,需要Leader把这个请求下发给所有的Follower进行投票,如果过半数达成一致,才认为该请求可以通过,即是可以commit的请求,此时Leader又会下发commit命令,让Follower去执行commit。Follower就是在上述Follower与Leader通信模块中接收到该请求,然后存放至CommitProcessor的committedRequests中的。

  • ArrayList<Request> toProcess:需要被下一个处理器处理的请求,可以来自committedRequests,如果是非事务请求不会经过committedRequests,直接到达toProcess中。

具体的处理逻辑如下:

CommitProcessor处理逻辑1

CommitProcessor处理逻辑2

先解释下nextPending:即等待被处理的事务请求,注意一定是事务请求。

  • 第一步:先把toProcess中的请求交给下一个处理器来处理,然后清空toProcess

  • 第二步:如果queuedRequests列表为空,但是当前有等待被处理的事务。即是这样的场景:FollowerRequestProcessor向queuedRequests中提交了一个事务请求,然后改事务请求作为了下一个等待被处理的事务请求即nextPending,然后从queuedRequests中删除,同时FollowerRequestProcessor将该事务请求转发给Leader,但是此时Leader还没有判定该请求是否能够被提交,即还未向Follower发送commit该请求的操作,即CommitProcessor中的committedRequests为空,此时Follower要做的事情就是等待Leader向它的committedRequests中发送判定结果。

  • 第三步:如果CommitProcessor中的committedRequests不为空,即Leader向该Follower发送了相关的提交请求。则拿出第一个需要commit的请求,验证下当前需要被处理的事务请求是不是和刚才拿出的请求是不是同一个请求,如果是同一个请求,则替换nextPending中的部分数据,同时存放至toProcess,等待被下一个处理器来处理。同时交出nextPending位置,等待下一个事务请求来占用。

  • 第四步:如果和nextPending不是同一个请求,则直接存放至toProcess,等待被下一个处理器来处理。其他一切不变

  • 第五步:判断当前是否有等待被处理的事务请求,即nextPending是否为空,如果不为空则continue,不执行下面的第6步和第7步操作。是为了保证请求都能够被顺序处理,前面一个没处理完,后面的请求不能被处理

  • 第六步:能够走到第6步,说明nextPending已经为null了,然后从queuedRequests中取出一个请求,如果是事务请求,则把nextPending的位置占住

  • 第七步:如果不是事务请求,则直接存放至toProcess,等待被下一个处理器来处理

其实到这里,一旦是事务请求,就会被阻塞在这里,等待Leader的决定。那接下来我们就去看看Leader是如何处理Follower转发过来的请求的。我们就以创建session为例。

上面说过了,Leader会为每一个Follower创建一个LearnerHandler,来处理与该Follower的通信,对于Follower转发的请求,处理如下:

LearnerHandler接收到来自对应Follower的请求

先还原成一个Request,然后为该Request设置owner,this即LearnerHandler。然后就把该请求交给了Leader的请求处理器链,Leader的第一个请求处理器是PrepRequestProcessor

2.4.3 Leader的PrepRequestProcessor处理器

这个处理器我们在ZooKeeper单机版的时候详细讲解过了,见PrepRequestProcessor处理器

这里需要提前说明下,当客户端发送请求给Follower的时候,这时候先使用Follower的LearnerSessionTracker为该创建session的请求分配了sessionId,同时给定了sessionTimeout时间。但是并没有在Follower本地保留任何信息。

来看下PrepRequestProcessor对于创建session的处理

PrepRequestProcessor对创建session的处理

根据请求的sessionId和sessionTimeout时间在Leader中使用SessionTrackerImpl创建出一个session。这一部分在单机版的时候已经详细描述过了,这里不再说明。然后设置该session的owner是上述提到过的LearnerHandler,代表了某个Follower。

至此就在Leader中创建出了session。

然后继续下一个处理器ProposalRequestProcessor

2.4.4 Leader的ProposalRequestProcessor处理器

处理过程如下:

ProposalRequestProcessor处理过程

  • 第一步:交给下一个处理器来处理,下一个处理器是CommitProcessor,我们知道会阻塞在那里
  • 第二步:如果请求是事务请求的话,则根据该请求创建出一份议案,发给所有的Learner(即Follower和Observer)
  • 第三步:同时记录该事务请求到事务日志文件中

来详细看下第二步如何发出议案:

根据请求发出议案

可以看到就是根据request的内容,构建了一个QuorumPacket包,然后发送给所有的Follower,同时对QuorumPacket进行包装,创建出Proposal议案,存至ConcurrentMap<Long, Proposal> outstandingProposals结构中,key就是请求的zxid。

Follower接收到之后该如何处理呢?

Follower处理来自Leader的Proposal类型的数据包

logRequest内容

可以看到Follower的处理就是把该请求交给了SyncRequestProcessor,它会把事务请求记录到日志中去,同时交给下一个处理器来处理。Follower的SyncRequestProcessor下一个处理器是SendAckRequestProcessor,来看下它又是如何来处理的呢?

Follower的SendAckRequestProcessor

仅仅就是向Leader回复一个Leader.ACK的响应包,表明本Follower已完成事务请求的记录。

再回到Leader的ProposalRequestProcessor处理器,然后来详细看下上述第三步中记录到事务日志文件中的内容:

同样是利用SyncRequestProcessor把事务请求记录到事务日志文件中,然后交给下一个处理器来处理,Leader的下一个处理AckRequestProcessor,如下:

Leader的ACK响应

其他的Follower在记录完事务请求后,都使用SendAckRequestProcessor向Leader发送一个应答响应,Leader自己在记录完事务请求后,也需要一个应答,只是不用发送数据包了,直接调用,响应方法leader.processAck,其实Leader在接收到其他Follower发送的Leader.ACK的响应包,也会调用该方法进行处理,如下:

LearnerHandler对ACK的处理

具体的处理过程就是:

Leader处理ACK响应的过程

  • 第一步:根据zxid从Leader的ConcurrentMap<Long, Proposal> outstandingProposals取出议案

  • 第二步:记录该议案的已经响应的Follower数量

  • 第三步:然后判决数量是不是已经过半?

  • 第四步:如果过半,则将该决议存至ConcurrentLinkedQueue<Proposal> toBeApplied结构中,作为历史备份,一旦某个决议被真正执行了,就从中删除。

  • 第五、六步:向所有的Follower和Observer发送一个commit请求包,因为此时Follower和Observer都处于阻塞状态(也不一定)等待Leader的commit数据包。两者的主要区别是,Leader之前已经向Follower发送过请求的具体内容,这次commit就不需要再完整的发送整个请求内容了,而Observer之前没有收到这个提案,不知道有这个请求,所以需要把整个请求数据全发给Observer。

  • 第七步:由于Leader本身也阻塞在CommitProcessor,所以需要给自己的CommitProcessor中加入之前的请求。

对于连接Follower创建session来说,Leader发送的Commit请求到达Follower之后,该请求就可以在Follower中继续走下去,即走到了Follower的最后一个处理器FinalRequestProcessor,之前就详细说过了

对客户端的创建session请求进行响应

Follower对客户端创建session的请求执行上述响应,从而整个集群版的连接过程就建立起来了。

再说说具体的细节问题,其他的Follower和Observer都接收到了Leader发来的commit请求,他们该如何来处理呢?对于创建session来说,他们其实都不需要做什么,那这个request又是如何被处理掉的呢?

处理不属于自己的请求

在FinalRequestProcessor中会做这样的一个判断,即判断该请求是否有ServerCnxn,如果是客户端连接的那台Follower,必然会有ServerCnxn,而其他Follower和Observer接收到的请求是从Leader过来的,就没有ServerCnxn,所以就被过滤掉了,不用执行。

2.4.5 Leader的ToBeAppliedRequestProcessor处理器

Leader的ToBeAppliedRequestProcessor处理器

就是把请求交给下一个处理器即FinalRequestProcessor,同时从之前的决议队列中取出然后删除。

3 结束语

本篇文章贴代码形式地介绍了Leader和Follower的请求处理器以及他们通信的过程。下一篇文章就纯理论地介绍如下过程

  • 1 连接Leader建立session关联的过程,以及session不断激活的过程
  • 2 连接Follower建立session关联的过程,以及session不断激激活的过程
  • 3 连接Observer建立session关联的过程,以及session不断激激活的过程
  • 4 连接Leader,setData的过程
  • 5 连接Follower,setData的过程
  • 6 连接Observer,setData的过程
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
SQL 机器学习/深度学习 分布式计算
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
60 1
|
14天前
|
PHP 数据库 数据安全/隐私保护
布谷直播源码部署服务器关于数据库配置的详细说明
布谷直播系统源码搭建部署时数据库配置明细!
|
17天前
|
NoSQL 应用服务中间件 PHP
布谷一对一直播源码服务器环境配置及app功能
一对一直播源码阿里云服务器环境配置及要求
|
23天前
|
NoSQL Linux PHP
|
1月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
61 6
|
1月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
41 4
|
1月前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
69 4
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
55 3
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
32 3
|
1月前
|
分布式计算 Hadoop Shell
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
57 3