【Zookeeper】源码分析之服务器(二)之ZooKeeperServer

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 前面阐述了服务器的总体框架,下面来分析服务器的所有父类ZooKeeperServer。

一、前言


  前面阐述了服务器的总体框架,下面来分析服务器的所有父类ZooKeeperServer。


二、ZooKeeperServer源码分析


  2.1 类的继承关系 

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}

说明:ZooKeeperServer是ZooKeeper中所有服务器的父类,其实现了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定义了expire方法(表示会话过期)和getServerId方法(表示获取服务器ID),而Provider则主要定义了获取服务器某些数据的方法。

  2.2 类的内部类

  1. DataTreeBuilder类

  public interface DataTreeBuilder {
        // 构建DataTree
        public DataTree build();
    }

说明:其定义了构建树DataTree的接口。

  2. BasicDataTreeBuilder类 

 static public class BasicDataTreeBuilder implements DataTreeBuilder {
        public DataTree build() {
            return new DataTree();
        }
    }

 说明:实现DataTreeBuilder接口,返回新创建的树DataTree。

  3. MissingSessionException类

 public static class MissingSessionException extends IOException {
        private static final long serialVersionUID = 7467414635467261007L;
        public MissingSessionException(String msg) {
            super(msg);
        }
    }

说明:表示会话缺失异常。

  4. ChangeRecord类 

 static class ChangeRecord {
        ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
                List<ACL> acl) {
            // 属性赋值
            this.zxid = zxid;
            this.path = path;
            this.stat = stat;
            this.childCount = childCount;
            this.acl = acl;
        }
        // zxid
        long zxid;
        // 路径
        String path;
        // 统计数据
        StatPersisted stat; /* Make sure to create a new object when changing */
        // 子节点个数
        int childCount;
        // ACL列表
        List<ACL> acl; /* Make sure to create a new object when changing */
        @SuppressWarnings("unchecked")
        // 拷贝
        ChangeRecord duplicate(long zxid) {
            StatPersisted stat = new StatPersisted();
            if (this.stat != null) {
                DataTree.copyStatPersisted(this.stat, stat);
            }
            return new ChangeRecord(zxid, path, stat, childCount,
                    acl == null ? new ArrayList<ACL>() : new ArrayList(acl));
        }
    }

 说明:ChangeRecord数据结构是用于方便PrepRequestProcessor和FinalRequestProcessor之间进行信息共享,其包含了一个拷贝方法duplicate,用于返回属性相同的ChangeRecord实例。

  2.3 类的属性  

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
    // 日志器
    protected static final Logger LOG;
    static {
        // 初始化日志器
        LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
        Environment.logEnv("Server environment:", LOG);
    }
    // JMX服务
    protected ZooKeeperServerBean jmxServerBean;
    protected DataTreeBean jmxDataTreeBean;
    // 默认心跳频率
    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    /** value of -1 indicates unset, use default */
    // 最小会话过期时间
    protected int minSessionTimeout = -1;
    /** value of -1 indicates unset, use default */
    // 最大会话过期时间
    protected int maxSessionTimeout = -1;
    // 会话跟踪器
    protected SessionTracker sessionTracker;
    // 事务日志快照
    private FileTxnSnapLog txnLogFactory = null;
    // Zookeeper内存数据库
    private ZKDatabase zkDb;
    // 
    protected long hzxid = 0;
    // 异常
    public final static Exception ok = new Exception("No prob");
    // 请求处理器
    protected RequestProcessor firstProcessor;
    // 运行标志
    protected volatile boolean running;
    /**
     * This is the secret that we use to generate passwords, for the moment it
     * is more of a sanity check.
     */
    // 生成密码的密钥
    static final private long superSecret = 0XB3415C00L;
    // 
    int requestsInProcess;
    // 未处理的ChangeRecord
    final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
    // this data structure must be accessed under the outstandingChanges lock
    // 记录path对应的ChangeRecord
    final HashMap<String, ChangeRecord> outstandingChangesForPath =
        new HashMap<String, ChangeRecord>();
    // 连接工厂
    private ServerCnxnFactory serverCnxnFactory;
    // 服务器统计数据
    private final ServerStats serverStats;
}

说明:类中包含了心跳频率,会话跟踪器(处理会话)、事务日志快照、内存数据库、请求处理器、未处理的ChangeRecord、服务器统计信息等。

  2.4 类的构造函数

  1. ZooKeeperServer()型构造函数

 public ZooKeeperServer() {
        serverStats = new ServerStats(this);
    }

说明:其只初始化了服务器的统计信息。

  2. ZooKeeperServer(FileTxnSnapLog, int, int, int, DataTreeBuilder, ZKDatabase)型构造函数 

public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            int minSessionTimeout, int maxSessionTimeout,
            DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
        // 给属性赋值
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        this.minSessionTimeout = minSessionTimeout;
        this.maxSessionTimeout = maxSessionTimeout;
        LOG.info("Created server with tickTime " + tickTime
                + " minSessionTimeout " + getMinSessionTimeout()
                + " maxSessionTimeout " + getMaxSessionTimeout()
                + " datadir " + txnLogFactory.getDataDir()
                + " snapdir " + txnLogFactory.getSnapDir());
    }

说明:该构造函数会初始化服务器统计数据、事务日志工厂、心跳时间、会话时间(最短超时时间和最长超时时间)。

  3. ZooKeeperServer(FileTxnSnapLog, int, DataTreeBuilder)型构造函数  

 public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            DataTreeBuilder treeBuilder) throws IOException {
        this(txnLogFactory, tickTime, -1, -1, treeBuilder,
                new ZKDatabase(txnLogFactory));
    }

 说明:其首先会生成ZooKeeper内存数据库后,然后调用第二个构造函数进行初始化操作。

  4. ZooKeeperServer(File, File, int)型构造函数 

public ZooKeeperServer(File snapDir, File logDir, int tickTime)
            throws IOException {
        this( new FileTxnSnapLog(snapDir, logDir),
                tickTime, new BasicDataTreeBuilder());
    }

说明:其会调用同名构造函数进行初始化操作。

  5. ZooKeeperServer(FileTxnSnapLog, DataTreeBuilder)型构造函数 

 public ZooKeeperServer(FileTxnSnapLog txnLogFactory,
            DataTreeBuilder treeBuilder)
        throws IOException
    {
        this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, treeBuilder,
                new ZKDatabase(txnLogFactory));
    }

 说明:其生成内存数据库之后再调用同名构造函数进行初始化操作。

  2.5 核心函数分析

  1. loadData函数 

public void loadData() throws IOException, InterruptedException {
        /*
         * When a new leader starts executing Leader#lead, it 
         * invokes this method. The database, however, has been
         * initialized before running leader election so that
         * the server could pick its zxid for its initial vote.
         * It does it by invoking QuorumPeer#getLastLoggedZxid.
         * Consequently, we don't need to initialize it once more
         * and avoid the penalty of loading it a second time. Not 
         * reloading it is particularly important for applications
         * that host a large database.
         * 
         * The following if block checks whether the database has
         * been initialized or not. Note that this method is
         * invoked by at least one other method: 
         * ZooKeeperServer#startdata.
         *  
         * See ZOOKEEPER-1642 for more detail.
         */
        if(zkDb.isInitialized()){ // 内存数据库已被初始化
            // 设置为最后处理的Zxid
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else { // 未被初始化,则加载数据库
            setZxid(zkDb.loadDataBase());
        }
        // Clean up dead sessions
        LinkedList<Long> deadSessions = new LinkedList<Long>();
        for (Long session : zkDb.getSessions()) { // 遍历所有的会话
            if (zkDb.getSessionWithTimeOuts().get(session) == null) { // 删除过期的会话
                deadSessions.add(session);
            }
        }
        // 完成DataTree的初始化
        zkDb.setDataTreeInit(true);
        for (long session : deadSessions) { // 遍历过期会话
            // XXX: Is lastProcessedZxid really the best thing to use?
            // 删除会话
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }
    }

 说明:该函数用于加载数据,其首先会判断内存库是否已经加载设置zxid,之后会调用killSession函数删除过期的会话,killSession会从sessionTracker中删除session,并且killSession最后会调用DataTree的killSession函数,其源码如下

 void killSession(long session, long zxid) {
        // the list is already removed from the ephemerals
        // so we do not have to worry about synchronizing on
        // the list. This is only called from FinalRequestProcessor
        // so there is no need for synchronization. The list is not
        // changed here. Only create and delete change the list which
        // are again called from FinalRequestProcessor in sequence.
        // 移除session,并获取该session对应的所有临时节点
        HashSet<String> list = ephemerals.remove(session);
        if (list != null) {
            for (String path : list) { // 遍历所有临时节点
                try {
                    // 删除路径对应的节点
                    deleteNode(path, zxid);
                    if (LOG.isDebugEnabled()) {
                        LOG
                                .debug("Deleting ephemeral node " + path
                                        + " for session 0x"
                                        + Long.toHexString(session));
                    }
                } catch (NoNodeException e) {
                    LOG.warn("Ignoring NoNodeException for path " + path
                            + " while removing ephemeral for dead session 0x"
                            + Long.toHexString(session));
                }
            }
        }
    }

说明:DataTree的killSession函数的逻辑首先移除session,然后取得该session下的所有临时节点,然后逐一删除临时节点。

  2. submit函数 

  public void submitRequest(Request si) {
        if (firstProcessor == null) { // 第一个处理器为空
            synchronized (this) {
                try {
                    while (!running) { // 直到running为true,否则继续等待
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            // 是否为合法的packet
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) { 
                // 处理请求
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

说明:当firstProcessor为空时,并且running标志为false时,其会一直等待,直到running标志为true,之后调用touch函数判断session是否存在或者已经超时,之后判断请求的类型是否合法,合法则使用请求处理器进行处理。

  3. processConnectRequest函数 

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        // 反序列化
        connReq.deserialize(bia, "connect");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Session establishment request from client "
                    + cnxn.getRemoteSocketAddress()
                    + " client's lastZxid is 0x"
                    + Long.toHexString(connReq.getLastZxidSeen()));
        }
        boolean readOnly = false;
        try {
            // 是否为只读
            readOnly = bia.readBool("readOnly");
            cnxn.isOldClient = false;
        } catch (IOException e) {
            // this is ok -- just a packet from an old client which
            // doesn't contain readOnly field
            LOG.warn("Connection request from old client "
                    + cnxn.getRemoteSocketAddress()
                    + "; will be dropped if server is in r-o mode");
        }
        if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { // 为只读模式但是该服务器是只读服务器,抛出异常
            String msg = "Refusing session request for not-read-only client "
                + cnxn.getRemoteSocketAddress();
            LOG.info(msg);
            throw new CloseRequestException(msg);
        }
        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { // 请求连接的zxid大于DataTree处理的最大的zxid,抛出异常
            String msg = "Refusing session request for client "
                + cnxn.getRemoteSocketAddress()
                + " as it has seen zxid 0x"
                + Long.toHexString(connReq.getLastZxidSeen())
                + " our last zxid is 0x"
                + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                + " client must try another server";
            LOG.info(msg);
            throw new CloseRequestException(msg);
        }
        // 获取超时时间
        int sessionTimeout = connReq.getTimeOut();
        // 获取密码
        byte passwd[] = connReq.getPasswd();
        // 获取最短超时时间
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) { 
            sessionTimeout = minSessionTimeout;
        }
        // 获取最长超时时间
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        // 设置超时时间
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
        // 不接收任何packet,直到会话创建成功
        cnxn.disableRecv();
        // 获取会话id
        long sessionId = connReq.getSessionId();
        if (sessionId != 0) { // 表示重新创建会话
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            // 关闭会话
            serverCnxnFactory.closeSession(sessionId);
            // 设置会话id
            cnxn.setSessionId(sessionId);
            // 重新打开会话
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
            // 创建会话
            createSession(cnxn, passwd, sessionTimeout);
        }
    }

说明:其首先将传递的ByteBuffer进行反序列化,转化为相应的ConnectRequest,之后进行一系列判断(可能抛出异常),然后获取并判断该ConnectRequest中会话id是否为0,若为0,则表示可以创建会话,否则,重新打开会话。

  4. processPacket函数 

  public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        // 创建请求头
        RequestHeader h = new RequestHeader();
        // 将头反序列化为RequestHeader
        h.deserialize(bia, "header");
        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) { // 需要进行认证(有密码)
            LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
            AuthPacket authPacket = new AuthPacket();
            // 将ByteBuffer转化为AuthPacket
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
            // 获取AuthPacket的模式
            String scheme = authPacket.getScheme();
            AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if(ap != null) {
                try {
                    // 进行认证
                    authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
                } catch(RuntimeException e) {
                    LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
                    authReturn = KeeperException.Code.AUTHFAILED;                   
                }
            }
            if (authReturn!= KeeperException.Code.OK) { // 认证失败
                if (ap == null) {
                    LOG.warn("No authentication provider for scheme: "
                            + scheme + " has "
                            + ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: " + scheme);
                }
                // send a response...
                // 构造响应头
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.AUTHFAILED.intValue());
                // 发送响应
                cnxn.sendResponse(rh, null, null);
                // ... and close connection
                // 关闭连接的信息
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                // 不接收任何packet
                cnxn.disableRecv();
            } else { // 认证成功
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Authentication succeeded for scheme: "
                              + scheme);
                }
                LOG.info("auth success " + cnxn.getRemoteSocketAddress());
                // 构造响应头
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.OK.intValue());
                // 发送响应
                cnxn.sendResponse(rh, null, null);
            }
            return;
        } else {
            if (h.getType() == OpCode.sasl) { // 为SASL类型
                // 处理SASL
                Record rsp = processSasl(incomingBuffer,cnxn);
                // 构造响应头
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                // 发送响应
                cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
            }
            else { // 不为SASL类型
                // 创建请求
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
                // 设置请求所有者
                si.setOwner(ServerCnxn.me);
                // 提交请求
                submitRequest(si);
            }
        }
        // 
        cnxn.incrOutstandingRequests(h);
    }

说明:该函数首先将传递的ByteBuffer进行反序列,转化为相应的RequestHeader,然后根据该RequestHeader判断是否需要认证,若认证失败,则构造认证失败的响应并发送给客户端,然后关闭连接,并且再补接收任何packet。若认证成功,则构造认证成功的响应并发送给客户端。若不需要认证,则再判断其是否为SASL类型,若是,则进行处理,然后构造响应并发送给客户端,否则,构造请求并且提交请求。


三、总结


  本篇分析了ZooKeeperServer的源码,了解了其对于请求和会话的处理,也谢谢各位园友的观看~

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
57 1
|
3月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
66 1
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
3月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
59 0
|
5月前
|
网络协议 Java 应用服务中间件
Tomcat源码分析 (一)----- 手撕Java Web服务器需要准备哪些工作
本文探讨了后端开发中Web服务器的重要性,特别是Tomcat框架的地位与作用。通过解析Tomcat的内部机制,文章引导读者理解其复杂性,并提出了一种实践方式——手工构建简易Web服务器,以此加深对Web服务器运作原理的认识。文章还详细介绍了HTTP协议的工作流程,包括请求与响应的具体格式,并通过Socket编程在Java中的应用实例,展示了客户端与服务器间的数据交换过程。最后,通过一个简单的Java Web服务器实现案例,说明了如何处理HTTP请求及响应,强调虽然构建基本的Web服务器相对直接,但诸如Tomcat这样的成熟框架提供了更为丰富和必要的功能。
|
8月前
|
存储 Java 网络安全
ZooKeeper【搭建 03】apache-zookeeper-3.6.0 伪集群版(一台服务器实现三个节点的ZooKeeper集群)
【4月更文挑战第10天】ZooKeeper【搭建 03】apache-zookeeper-3.6.0 伪集群版(一台服务器实现三个节点的ZooKeeper集群)
82 1
|
10天前
|
机器学习/深度学习 人工智能 PyTorch
阿里云GPU云服务器怎么样?产品优势、应用场景介绍与最新活动价格参考
阿里云GPU云服务器怎么样?阿里云GPU结合了GPU计算力与CPU计算力,主要应用于于深度学习、科学计算、图形可视化、视频处理多种应用场景,本文为您详细介绍阿里云GPU云服务器产品优势、应用场景以及最新活动价格。
阿里云GPU云服务器怎么样?产品优势、应用场景介绍与最新活动价格参考
|
9天前
|
存储 运维 安全
阿里云弹性裸金属服务器是什么?产品规格及适用场景介绍
阿里云服务器ECS包括众多产品,其中弹性裸金属服务器(ECS Bare Metal Server)是一种可弹性伸缩的高性能计算服务,计算性能与传统物理机无差别,具有安全物理隔离的特点。分钟级的交付周期将提供给您实时的业务响应能力,助力您的核心业务飞速成长。本文为大家详细介绍弹性裸金属服务器的特点、优势以及与云服务器的对比等内容。
|
17天前
|
人工智能 JSON Linux
利用阿里云GPU加速服务器实现pdf转换为markdown格式
随着AI模型的发展,GPU需求日益增长,尤其是个人学习和研究。直接购置硬件成本高且更新快,建议选择阿里云等提供的GPU加速型服务器。
利用阿里云GPU加速服务器实现pdf转换为markdown格式
|
1天前
|
存储 人工智能 网络协议
浅聊阿里云倚天云服务器:c8y、g8y、r8y实例性能详解与活动价格参考
选择一款高性能、高性价比的云服务器对于企业而言至关重要,阿里云推出的倚天云服务器——c8y、g8y、r8y三款实例,它们基于ARM架构,采用阿里自研的倚天710处理器,并基于新一代CIPU架构,通过芯片快速路径加速手段,实现了计算、存储、网络性能的大幅提升。2025年,计算型c8y云服务器活动价格860.65元一年起,通用型g8y云服务器活动价格1187.40元一年起,内存型r8y云服务器活动价格1454.32元一年起。本文将为大家详细解析这三款实例的性能特点、应用场景以及最新的活动价格情况,帮助大家更好地了解阿里云倚天云服务器。