深入理解 ZooKeeper单机客户端的启动流程(二)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 深入理解 ZooKeeper单机客户端的启动流程(二)

思考:#


虽然找到了客户端往服务端发送数据的代码, 但是问题来了, 它发送的什么数据啊? 在上面可以看到,它每次发送的数据都被包装车成了packet类型,并且,继续往下跟进可以看到这个packet来自于一个叫outgoingqueue的队列中


client想往服务端发送什么?其实发送就是我们手动输入的命令,只不过他把我们的命令解析出来并且进行了封装,进行了哪些封装? String-> request -> packet -> socket ,这个packet就在上面的部分被消费


到目前为止,算上一开始的主线程,其实已经有3条线程了, 分别是主线程,SendThread和eventThread


代码读到这里,sendThread部分其实已经结束了,我们直到了它正在消费outgoingqueue中的内容,接下来的任务返回回去,从新回到 ZooKeeperMain中,看一开始主线程时如何处理用户在命令行的输入的


// todo zookeeper的入口方法
    public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
        // todo new ZK客户端
        ZooKeeperMain main = new ZooKeeperMain(args);
        // todo run方法的实现在下面
        main.run();
    }


跟进 main.run(), 主要做了如下几件事

  • 通过反射创建出可以获取控制台输入的对象jline.ConsoleReader
  • 通过反射创建出可以解析键盘录入的对象
  • 开启while循环,等待用户的输入,处理用户的输入executeLine(line);


@SuppressWarnings("unchecked")
    void run() throws KeeperException, IOException, InterruptedException {
        if (cl.getCommand() == null) {
            System.out.println("Welcome to ZooKeeper!");
            boolean jlinemissing = false;
            // only use jline if it's in the classpath
            try {
                // todo jline.ConsoleReader是java命令行的实现类, 获取可从控制台接受输入的对象
                Class<?> consoleC = Class.forName("jline.ConsoleReader");
                Class<?> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompletor");
                System.out.println("JLine support is enabled");
                // todo 使用反射获取实例
                Object console = consoleC.getConstructor().newInstance();
                Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk);
                // todo 通过反射获取某指定类的指定方法  Completor
                Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor"));
                addCompletor.invoke(console, completor);
                String line;
                Method readLine = consoleC.getMethod("readLine", String.class);
                // todo 我们在命令行中输入的那些命令最终都会来到这里执行
                // todo getPrompt() 方法 就是在控制台上打印出了命令行的前缀---  [zk: " + host + "("+zk.getState()+")" + " " + commandCount + "] "
                while ((line = (String) readLine.invoke(console, getPrompt())) != null) {
                    // todo 执行命令行的输入
                    executeLine(line);
                }
            } catch (ClassNotFoundException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (NoSuchMethodException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (InvocationTargetException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (IllegalAccessException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (InstantiationException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            }
            if (jlinemissing) {
                System.out.println("JLine support is disabled");
                BufferedReader br =
                        new BufferedReader(new InputStreamReader(System.in));
                String line;
                while ((line = br.readLine()) != null) {
                    executeLine(line);
                }
            }
        } else {
            // Command line args non-null.  Run what was passed.
            processCmd(cl);
        }
    }


继续跟进 executeLine(line);,做了如下几件事

  • 处理用户输入
  • 将命令添加到历史命令
  • 处理命令
  • 命令数+1


public void executeLine(String line)
                throws InterruptedException, IOException, KeeperException {
            if (!line.equals("")) {
                cl.parseCommand(line);
                // todo 添加到历史命令
                addToHistory(commandCount, line);
                // todo 具体处理命令
                processCmd(cl);
                // todo 命令次数+1
                commandCount++;
        }
    }


处理命令的逻辑如下:

将命令解析出来,通过if分支语句,判断用户输入的什么命令, 然后再进一步处理


// todo zookeeper客户端, 处理用户输入命令的具体逻辑
    // todo  用大白话讲,下面其实就是把 从控制台获取的用户的输入信息转换成指定的字符, 然后发送到服务端
    // todo MyCommandOptions 是处理命令行选项和shell脚本的工具类
    protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
        // todo 在这个方法中可以看到很多的命令行所支持的命令
    Stat stat = new Stat();
    // todo 获取命令行输入中 0 1 2 3 ... 位置的内容, 比如 0 位置是命令  1 2 3 位置可能就是不同的参数
    String[] args = co.getArgArray();
    String cmd = co.getCommand();
    if (args.length < 1) {
        usage();
        return false;
    }
    if (!commandMap.containsKey(cmd)) {
        usage();
        return false;
    }
    boolean watch = args.length > 2;
    String path = null;
    List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
    LOG.debug("Processing " + cmd);
    ...
   // todo 看看这个create命令的实现, 如果是-e 就是很 CreateMode= ephemeral sequential 时序的
    if (cmd.equals("create") && args.length >= 3) {
        int first = 0;
        CreateMode flags = CreateMode.PERSISTENT;
        if ((args[1].equals("-e") && args[2].equals("-s"))
                || (args[1]).equals("-s") && (args[2].equals("-e"))) {
            first += 2;
            flags = CreateMode.EPHEMERAL_SEQUENTIAL;
        } else if (args[1].equals("-e")) {
            first++;
            flags = CreateMode.EPHEMERAL;
        } else if (args[1].equals("-s")) {
            first++;
            flags = CreateMode.PERSISTENT_SEQUENTIAL;
        }
    ...


比如,用户输入的是创建新节点的命令create /path, 就会有下面的函数处理


// todo  调用Zookeeper的 create方法,
    String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags);


跟进这个方法 , 主要做了下面几件事

  • 校验合法性
  • 封装进 request
  • 添加acl
  • 提交submitRequest(),他是个重要的阻塞方法,每次执行都会阻塞等待服务端的响应
  • 等待响应结果


public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        // todo 验证,path string 的合法性, 根据去查看
        PathUtils.validatePath(clientPath, createMode.isSequential());
        final String serverPath = prependChroot(clientPath);
        // todo 创建请求头, 不同的操作有不同的头标记
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        // todo 将命令行里面的内容嵌入到request
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        // todo 添加权限
        request.setAcl(acl);
        // todo 通过上下文, 将包装后的用户的request 提交到socket 传递到server , 跟进去看看
        ReplyHeader r =submitRequest
        // todo 判断是否出错了
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
10天前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
25 11
|
18天前
|
存储
ZooKeeper客户端常用命令
ZooKeeper客户端常用命令
25 1
|
2月前
|
算法 Java Linux
zookeeper单机伪集群集群部署
zookeeper单机伪集群集群部署
86 0
|
4月前
|
安全 Java API
Zookeeper(持续更新) VIP-02 Zookeeper客户端使用与集群特性
2,/usr/local/data/zookeeper-3,/usr/local/data/zookeeper-4,在每个目录中创建文件。创建四个文件夹/usr/local/data/zookeeper-1,/usr/local/data/zookeeper-Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点。己对外提供服务的起始状态。E: 角色, 默认是 participant,即参与过半机制的角色,选举,事务请求过半提交,还有一个是。
|
4月前
Zookeeper的客户端的命令
Zookeeper的客户端的命令
18 0
|
4月前
|
缓存 Java API
Zookeeper(持续更新) VIP-02 Zookeeper客户端使用与集群特性
Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。
|
4月前
|
Apache
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
46 2
|
5月前
|
存储 设计模式 算法
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。
137 1
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
|
18天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
32 2
|
4月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0