思考:#
虽然找到了客户端往服务端发送数据的代码, 但是问题来了, 它发送的什么数据啊? 在上面可以看到,它每次发送的数据都被包装车成了packet类型,并且,继续往下跟进可以看到这个packet来自于一个叫outgoingqueue的队列中
client想往服务端发送什么?其实发送就是我们手动输入的命令,只不过他把我们的命令解析出来并且进行了封装,进行了哪些封装? String-> request -> packet -> socket ,这个packet就在上面的部分被消费
到目前为止,算上一开始的主线程,其实已经有3条线程了, 分别是主线程,SendThread和eventThread
代码读到这里,sendThread部分其实已经结束了,我们直到了它正在消费outgoingqueue中的内容,接下来的任务返回回去,从新回到 ZooKeeperMain中,看一开始主线程时如何处理用户在命令行的输入的
// todo zookeeper的入口方法 public static void main(String args[]) throws KeeperException, IOException, InterruptedException { // todo new ZK客户端 ZooKeeperMain main = new ZooKeeperMain(args); // todo run方法的实现在下面 main.run(); }
跟进 main.run()
, 主要做了如下几件事
- 通过反射创建出可以获取控制台输入的对象
jline.ConsoleReader
- 通过反射创建出可以解析键盘录入的对象
- 开启while循环,等待用户的输入,处理用户的输入
executeLine(line);
@SuppressWarnings("unchecked") void run() throws KeeperException, IOException, InterruptedException { if (cl.getCommand() == null) { System.out.println("Welcome to ZooKeeper!"); boolean jlinemissing = false; // only use jline if it's in the classpath try { // todo jline.ConsoleReader是java命令行的实现类, 获取可从控制台接受输入的对象 Class<?> consoleC = Class.forName("jline.ConsoleReader"); Class<?> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompletor"); System.out.println("JLine support is enabled"); // todo 使用反射获取实例 Object console = consoleC.getConstructor().newInstance(); Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk); // todo 通过反射获取某指定类的指定方法 Completor Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor")); addCompletor.invoke(console, completor); String line; Method readLine = consoleC.getMethod("readLine", String.class); // todo 我们在命令行中输入的那些命令最终都会来到这里执行 // todo getPrompt() 方法 就是在控制台上打印出了命令行的前缀--- [zk: " + host + "("+zk.getState()+")" + " " + commandCount + "] " while ((line = (String) readLine.invoke(console, getPrompt())) != null) { // todo 执行命令行的输入 executeLine(line); } } catch (ClassNotFoundException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (NoSuchMethodException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InvocationTargetException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (IllegalAccessException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InstantiationException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } if (jlinemissing) { System.out.println("JLine support is disabled"); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String line; while ((line = br.readLine()) != null) { executeLine(line); } } } else { // Command line args non-null. Run what was passed. processCmd(cl); } }
继续跟进 executeLine(line);
,做了如下几件事
- 处理用户输入
- 将命令添加到历史命令
- 处理命令
- 命令数+1
public void executeLine(String line) throws InterruptedException, IOException, KeeperException { if (!line.equals("")) { cl.parseCommand(line); // todo 添加到历史命令 addToHistory(commandCount, line); // todo 具体处理命令 processCmd(cl); // todo 命令次数+1 commandCount++; } }
处理命令的逻辑如下:
将命令解析出来,通过if分支语句,判断用户输入的什么命令, 然后再进一步处理
// todo zookeeper客户端, 处理用户输入命令的具体逻辑 // todo 用大白话讲,下面其实就是把 从控制台获取的用户的输入信息转换成指定的字符, 然后发送到服务端 // todo MyCommandOptions 是处理命令行选项和shell脚本的工具类 protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { // todo 在这个方法中可以看到很多的命令行所支持的命令 Stat stat = new Stat(); // todo 获取命令行输入中 0 1 2 3 ... 位置的内容, 比如 0 位置是命令 1 2 3 位置可能就是不同的参数 String[] args = co.getArgArray(); String cmd = co.getCommand(); if (args.length < 1) { usage(); return false; } if (!commandMap.containsKey(cmd)) { usage(); return false; } boolean watch = args.length > 2; String path = null; List<ACL> acl = Ids.OPEN_ACL_UNSAFE; LOG.debug("Processing " + cmd); ... // todo 看看这个create命令的实现, 如果是-e 就是很 CreateMode= ephemeral sequential 时序的 if (cmd.equals("create") && args.length >= 3) { int first = 0; CreateMode flags = CreateMode.PERSISTENT; if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) { first += 2; flags = CreateMode.EPHEMERAL_SEQUENTIAL; } else if (args[1].equals("-e")) { first++; flags = CreateMode.EPHEMERAL; } else if (args[1].equals("-s")) { first++; flags = CreateMode.PERSISTENT_SEQUENTIAL; } ...
比如,用户输入的是创建新节点的命令create /path
, 就会有下面的函数处理
// todo 调用Zookeeper的 create方法, String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags);
跟进这个方法 , 主要做了下面几件事
- 校验合法性
- 封装进 request
- 添加acl
- 提交submitRequest(),他是个重要的阻塞方法,每次执行都会阻塞等待服务端的响应
- 等待响应结果
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; // todo 验证,path string 的合法性, 根据去查看 PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); // todo 创建请求头, 不同的操作有不同的头标记 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); // todo 将命令行里面的内容嵌入到request request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } // todo 添加权限 request.setAcl(acl); // todo 通过上下文, 将包装后的用户的request 提交到socket 传递到server , 跟进去看看 ReplyHeader r =submitRequest // todo 判断是否出错了 if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }