深入理解 ZooKeeper单机服务端的启动(二)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
日志服务 SLS,月写入数据量 50GB 1个月
简介: 深入理解 ZooKeeper单机服务端的启动(二)

启动单节点#


这次再进入这个方法,我们直接跳过它是如果从配置文件中读取出配置信息了,然后直接看它的启动方法


runFromConfig方法主要做了如下几件事

  • 创建ZooKeeperServer 它是单机ZK服务端的实例


如下的ZooKeeperServer相关的属性
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;
protected RequestProcessor firstProcessor
以及它可以构建DataTree


  • 创建ZooKeeperServerShutdownHandler 监控ZkServer关闭状态的处理器
  • 创建FileTxnSnapLog 文件快照相关的工具类
  • 给ZKServer绑定上单位时间trickTime(节点心跳交流的时间)
  • 初始化 ZKServer 处理事务,快照相关的工具类
  • 创建上下文的工厂
  • 通过工厂,启动上下文


public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // Note that this thread isn't going to be doing anything else,
        // so rather than spawning another thread, we will just call run() in this thread.
        // todo 请注意,当前线程不会做其他任何事情,因此我们只在当前线程中调用Run方法,而不是开启新线程
        // create a file logger url from the command line args
        // todo 根据命令中的args 创建一个logger文件
        final ZooKeeperServer zkServer = new ZooKeeperServer();
        // Registers shutdown handler which will be used to know the server error or shutdown state changes.
        // todo  注册一个shutdown handler, 通过他了解server发生的error或者了解shutdown 状态的更改
        final CountDownLatch shutdownLatch = new CountDownLatch(1);
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));
        // todo FileTxnSnapLog工具类, 与 文件快照相关
        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir));
        txnLog.setServerStats(zkServer.serverStats());
        zkServer.setTxnLogFactory(txnLog);
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        // todo 创建Server上下文的工厂,工厂方法模式
        // todo ServerCnxnFactory是个抽象类,他有不同是实现, NIO版本的 Netty版本的
        cnxnFactory = ServerCnxnFactory.createFactory();
        // todo 建立socket,默认是NIOServerCnxnFactory(是一个线程)
        cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
        // todo 跟进这个方法
        cnxnFactory.startup(zkServer);


  • 看一下如何创建处理事务,快照日志相关的数据文件的逻辑,可以看到,直接去关联我们配置的dataDir,snapDir,对应着日志存储的目录已经快照存储的目录, 然后封装进FileSnapFileTxnLog对象中


public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
    LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);
    // todo 关联上指定数据文件和日志文件
    // todo 给FileTxnSnapLog赋值
    this.dataDir = new File(dataDir, version + VERSION);
    this.snapDir = new File(snapDir, version + VERSION);
    if (!this.dataDir.exists()) {
    ...
    .
    .
   // todo 将这两个文件封装进 FileTxnLog 给当前类维护的两种事务快照( TnxnSnap ) 赋值
    txnLog = new FileTxnLog(this.dataDir);
    snapLog = new FileSnap(this.snapDir);


  • 上下文工厂



如上图,将ServerCnxnFactory.java的继承图,不同的上下文工厂的实现可以创建出不同的上下文,通过这个图可以看到,不仅支持传统的NIO,还有一套Netty的实现,当前我选择的是原生的实现NIOServerCnxnFactory的实现,那么由他创建出来的就是NIOServerCnxn


启动流程如下图



点击查看上图原文地址( zhaoyu_nb)


上下文工厂实例化服务端的NIOSocket#


在这个方法中创建了ZooKeeperThread,这个类ZK中设计的线程类,几乎全部的线程都由此类完成,当前方法中的做法是将创建的Thread赋值给了当前的类的引用,实际上约等于当前类就是线程类,还有需要注意的地方就是虽然进行了初始化,但是并没有开启

此处看到的就是java原生的NIO Socket编程, 当前线程类被设置成守护线程


Thread thread;
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
    configureSaslLogin();
    // todo 把当前类作为线程
    thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
    //todo 所以这里的这个线程是为了和JVM生命周期绑定,只剩下这个线程时已经没有意义了,应该关闭掉。
    thread.setDaemon(true);
    maxClientCnxns = maxcc;
    // todo 看到了NIO原生的代码,使用打开服务端的 Channel, 绑定端口,设置为非阻塞,注册上感兴趣的事件是 accept 连接事件
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    ss.register(selector, SelectionKey.OP_ACCEPT);
}


  • 由上下文工厂实例化的NIOServerCnxn

下面是它的属性,可以看到其实这个上下文涵盖的很全面,甚至服务端的ZK都被他维护着,


NIOServerCnxnFactory factory;
    final SocketChannel sock;
    protected final SelectionKey sk;
    boolean initialized;
    ByteBuffer lenBuffer = ByteBuffer.allocate(4);
    ByteBuffer incomingBuffer = lenBuffer;
    LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
    int sessionTimeout;
    protected final ZooKeeperServer zkServer;


上下文工厂(ServerFactoryCnxn)启动#


看完了ZooKeeperServerMainrunFromConfig方法中的创建ZKServer,FileTxnSnapLog等重要对象的逻辑,下面,上下文启动, 直接点击去查看这个方法,肯定直接进入ServerFactoryCnxn,我们选择的是它的实现类NIOServerCnxnFactory


public void runFromConfig(ServerConfig config) throws IOException {
        .
        .
        . 
        cnxnFactory.startup(zkServer);


下面是NIOServerCnxnFactory的实现,它做的第一件事就是开启上面实例化的所说的线程类,这条线程的开启标记着,服务端从此可以接收客户端发送的请求了


这个方法还做了如下三件事

  • 将ZooKeeperServer交给上下文维护
  • 因为这个是启动,所以从磁盘中完成数据的恢复
  • 继续运行
  • 创建计时器
  • 开启计时器
  • 开启三条处理器
  • 注册JMX
  • 修改运行的状态
  • 唤醒全部线程


public void startup(ZooKeeperServer zks) throws IOException,
        InterruptedException {
    // todo start(); ==> run() 开启线程
    start(); //todo 实现在上面, 到目前为止服务端已经可以接受客户端的请求了
    // todo 将ZKS 交给NIOServerCnxnFactory管理,意味着NIOServerCnxnFactory是目前来说,服务端功能最多的对象
    setZooKeeperServer(zks);
    // todo 因为是服务端刚刚启动,需要从从disk将数据恢复到内存
    zks.startdata();
    // todo 继续跟进
    zks.startup();
}


完成数据的恢复#


跟进startData()方法, 看到先创建ZKDatabase,这个对象就是存在于内存中的对象,对磁盘中数据可视化描述


// todo 将数据加载进缓存中
public void startdata() 
throws IOException, InterruptedException {
    //check to see if zkDb is not null
    if (zkDb == null) {
        // todo 如果没初始化的话就初始化
        zkDb = new ZKDatabase(this.txnLogFactory);
    }  
    if (!zkDb.isInitialized()) {
        // todo 恢复数据
        loadData();
    }
}


跟进创建ZKDataBase的逻辑, 最直观的可以看见,这个DB维护了DataTree和SnapLog


public ZKDatabase(FileTxnSnapLog snapLog) {
    // todo 创建了DataTree 数据树的空对象
    dataTree = new DataTree();
    sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
    //todo 用初始化好了的存有关于系统事务日志将snaplog初始化
    this.snapLog = snapLog;
}


loaddata()


public void loadData() throws IOException, InterruptedException {
        // todo zkDatabase 已经初始化了
        if(zkDb.isInitialized()){
            // todo zxid = 最近的一次znode的事务id
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        } else {
            //todo  zkDB 没有初始化就使用  zkDb.loadDataBase() , 跟进去看, 他从快照中获取数据
            setZxid(zkDb.loadDataBase());
        }
        // Clean up dead sessions
        LinkedList<Long> deadSessions = new LinkedList<Long>();
        for (Long session : zkDb.getSessions()) {
            if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                deadSessions.add(session);
            }
        }
        zkDb.setDataTreeInit(true);
        for (long session : deadSessions) {
            // XXX: Is lastProcessedZxid really the best thing to use?
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }
    }


  • 继续启动zks.startup(); 它的源码在下面,其中的计时器类也是一个线程类


// todo 继续启动, 服务端和客户端建立连接后会保留一个session, 其中这个sessiion的生命周期倒计时就在下面的 createSessionTracker();
    public synchronized void startup() {
        if (sessionTracker == null) {
            // todo 创建session计时器
            createSessionTracker();
        }
        // todo 开启计时器
        startSessionTracker();
        // todo 设置请求处理器, zookeeper中存在不同的请求处理器, 就在下面
        setupRequestProcessors();
        //todo 是一个为应用程序、设备、系统等植入管理功能的框架。
        //todo JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活的开发无缝集成的系统、网络和服务管理应用
        registerJMX();
        // todo 修改状态  --> running
        setState(State.RUNNING);
        // todo 唤醒所有线程, 因为前面有一个线程等待处理器 睡了一秒
        notifyAll();
    }


设置请求处理器#


着重看一下它的setupRequestProcessors()添加请求处理器,单机模式下仅仅存在三个处理器,除了最后一个不是线程类之外,其他两个都是线程类

  • PrepRequestProcessor
  • 校验权限
  • 修改请求的状态
  • SyncRequestProcessor
  • 将request持久化日志文件
  • 打快照
  • FinalRequestProcessor
  • 响应客户端的请求


protected void setupRequestProcessors() {
        // todo 下面的三个处理器的第二个参数是在指定 下一个处理器是谁
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        // todo  在服务端, 数据的处理  socket -> packet -> request -> queue
        // todo 然后由下面的requestprocessor 链 进行下一步处理request
        // todo  开启新线程, 服务端接收的客户端的请求都放在了 队列中,用处理器异步处理
        ((SyncRequestProcessor)syncProcessor).start();
        //todo  第一个处理器 , 下一个处理器是 syncProcessor  最后一个处理器 finalProcessor
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
                // todo  开启新线程  服务端接收的客户端的请求都放在了 队列中,用处理器异步处理
        ((PrepRequestProcessor)firstProcessor).start();
    }


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
缓存 Java Linux
ZooKeeper从单机到集群
ZooKeeper从单机到集群
78 1
Zookeeper学习---2、客户端API操作、客户端向服务端写数据流程
Zookeeper学习---2、客户端API操作、客户端向服务端写数据流程
Zookeeper学习---2、客户端API操作、客户端向服务端写数据流程
|
6月前
|
算法 Java Linux
zookeeper单机伪集群集群部署
zookeeper单机伪集群集群部署
128 0
|
6月前
|
Apache
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
126 2
|
6月前
|
存储 Shell Linux
ZooKeeper【部署 01】单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置+shell自动部署脚本(一篇入门zookeeper)
ZooKeeper【部署 01】单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置+shell自动部署脚本(一篇入门zookeeper)
535 0
Zookeeper系列 (三) [单机版连接客户端]
进入zookeeper的 bin目录 运行脚本: ./zkCli.sh
109 0
Zookeeper系列 (三) [单机版连接客户端]
|
消息中间件 分布式计算 算法
从一个 Demo 说起 Zookeeper 服务端源码
Zookeeper目前是Apache下的开源项目,作为最流行的分布式协调系统之一,我们平时开发中熟知的Dubbo、Kafka、Elastic-Job、Hadoop、HBase等顶级开源项目中的分布式协调功能都是通过借助Zookeeper来实现的,可以看到想要在生产中保障Zookeeper服务的稳定、快速排查系统问题,深入探究Zookeeper系统的原理是有必要的,那Zookeeper是什么呢?
112 0
从一个 Demo 说起 Zookeeper 服务端源码
|
Ubuntu Java 大数据
大数据Zookeeper-01.单机环境安装
Zookeeper单机环境安装
123 0
大数据Zookeeper-01.单机环境安装
Zookeeper单机伪集群搭建(三)
Zookeeper单机伪集群搭建(三)
102 0
Zookeeper单机伪集群搭建(三)
|
Apache
Zookeeper单机模式和集群模式环境搭建
Zookeeper单机模式和集群模式环境搭建
154 0