Zookeeper原生Java api使用(四)

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Zookeeper原生Java api使用(四)

zk客户端与服务器连接

客户端和zk服务端链接是一个异步的过程

当连接成功后后,客户端会收的一个watch通知

/**
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 连接demo演示
 */
public class ZKConnect implements Watcher {
  final static Logger log = LoggerFactory.getLogger(ZKConnect.class);
  public static final String zkServerPath = "192.168.254.130:2181";
//  public static final String zkServerPath = "192.168.1.111:2181,192.168.1.111:2182,192.168.1.111:2183";
  public static final Integer timeout = 5000;
  public static void main(String[] args) throws Exception {
    /**
     * 客户端和zk服务端链接是一个异步的过程
     * 当连接成功后后,客户端会收的一个watch通知
     *
     * 参数:
     * connectString:连接服务器的ip字符串,
     *    比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
     *    可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
     *    也可以在ip后加路径
     * sessionTimeout:超时时间,心跳收不到了,那就超时
     * watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
     * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
     *                 此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
     * sessionId:会话的id
     * sessionPasswd:会话密码 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
     */
    ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnect());
    log.warn("客户端开始连接zookeeper服务器...");
    log.warn("连接状态:{}", zk.getState());
    new Thread().sleep(2000);
    log.warn("连接状态:{}", zk.getState());
  }
  @Override
  public void process(WatchedEvent event) {
    log.warn("接受到watch通知:{}", event);
  }
}

zk会话重连机制

获取到连接zk的会话id和会话密码,我们就可以再次回到会话中

/**
 * 
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 恢复之前的会话连接demo演示
 */
public class ZKConnectSessionWatcher implements Watcher {
  final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);
  public static final String zkServerPath = "192.168.254.130:2181";
  public static final Integer timeout = 5000;
  public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher());
    new Thread().sleep(1000);
    long sessionId = zk.getSessionId();
    String ssid = "0x" + Long.toHexString(sessionId);
    System.out.println(ssid);
    byte[] sessionPassword = zk.getSessionPasswd();
    log.warn("客户端开始连接zookeeper服务器...");
    log.warn("连接状态:{}", zk.getState());
    log.warn("连接状态:{}", zk.getState());
    new Thread().sleep(200);
    // 开始会话重连
    log.warn("开始会话重连...");
    ZooKeeper zkSession = new ZooKeeper(zkServerPath, 
                      timeout, 
                      new ZKConnectSessionWatcher(), 
                      sessionId, 
                      sessionPassword);
    log.warn("重新连接状态zkSession:{}", zkSession.getState());
    new Thread().sleep(1000);
    log.warn("重新连接状态zkSession:{}", zkSession.getState());
  }
  @Override
  public void process(WatchedEvent event) {
    log.warn("接受到watch通知:{}", event);
  }
}

java操作zk,节点的创建,删除,设置

/**
 * 
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 操作demo演示
 */
public class ZKNodeOperator implements Watcher {
  private ZooKeeper zookeeper = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  public static final Integer timeout = 5000;
  public ZKNodeOperator() {}
  public ZKNodeOperator(String connectString) {
    try {
      zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeOperator());
    } catch (IOException e) {
      e.printStackTrace();
      if (zookeeper != null) {
        try {
          zookeeper.close();
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
      }
    }
  }
  /**
   * 
   * @Title: ZKOperatorDemo.java
   * @Description: 创建zk节点
   */
  public void createZKNode(String path, byte[] data, List<ACL> acls) {
    String result = "";
    try {
      /**
       * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
       * 参数:
       * path:创建的路径
       * data:存储的数据的byte[]
       * acl:控制权限策略
       *      Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
       *      CREATOR_ALL_ACL --> auth:user:password:cdrwa
       * createMode:节点类型, 是一个枚举
       *      PERSISTENT:持久节点
       *      PERSISTENT_SEQUENTIAL:持久顺序节点
       *      EPHEMERAL:临时节点
       *      EPHEMERAL_SEQUENTIAL:临时顺序节点
       */
      result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
      //异步创建,CreateCallBack是一个回调函数
/*      String ctx = "{'create':'success'}";
      zookeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);*/
      System.out.println("创建节点:\t" + result + "\t成功...");
      new Thread().sleep(2000);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  public static void main(String[] args) throws Exception {
    ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
    // 创建zk节点
    zkServer.createZKNode("/testnode", "testnode".getBytes(), Ids.OPEN_ACL_UNSAFE);
    /**
     * 设置节点数据
     * 参数:
     * path:节点路径
     * data:数据
     * version:数据状态
     */
//    Stat status  = zkServer.getZookeeper().setData("/testnode", "xyz".getBytes(), 2);
//    System.out.println(status.getVersion());
    /**
     * 同步删除数据,没有返回数据
     * 参数:
     * path:节点路径
     * version:数据状态
     */
/*    zkServer.createZKNode("/test-delete-node", "123".getBytes(), Ids.OPEN_ACL_UNSAFE);
    zkServer.getZookeeper().delete("/test-delete-node", 2);
    /**
     * 异步删除数据,在回调函数里面返回
     * 参数:
     * path:节点路径
     * version:数据状态
     */
    String ctx = "{'delete':'success'}";
    zkServer.getZookeeper().delete("/test-delete-node", 0, new DeleteCallBack(), ctx);
    Thread.sleep(2000);*/
  }
  public ZooKeeper getZookeeper() {
    return zookeeper;
  }
  public void setZookeeper(ZooKeeper zookeeper) {
    this.zookeeper = zookeeper;
  }
  @Override
  public void process(WatchedEvent event) {
  }
}

删除回调函数

public class DeleteCallBack implements VoidCallback {
  @Override
  public void processResult(int rc, String path, Object ctx) {
    System.out.println("删除节点" + path);
    System.out.println((String)ctx);
  }
}

获取zk节点数据

使用CountDownLatch来监听

public class ZKGetNodeData implements Watcher {
  private ZooKeeper zookeeper = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  public static final Integer timeout = 5000;
  //存放节点数据
  private static Stat stat = new Stat();
  public ZKGetNodeData() {}
  public ZKGetNodeData(String connectString) {
    try {
      zookeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData());
    } catch (IOException e) {
      e.printStackTrace();
      if (zookeeper != null) {
        try {
          zookeeper.close();
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
      }
    }
  }
  private static CountDownLatch countDown = new CountDownLatch(1);
  public static void main(String[] args) throws Exception {
    ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
    /**
     * 参数:
     * path:节点路径
     * watch:true或者false,注册一个watch事件
     * stat:状态
     */
    byte[] resByte = zkServer.getZookeeper().getData("/bushro", true, stat);
    String result = new String(resByte);
    System.out.println("当前值:" + result);
    countDown.await();
  }
  @Override
  public void process(WatchedEvent event) {
    try {
      if(event.getType() == EventType.NodeDataChanged){
        ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
        byte[] resByte = zkServer.getZookeeper().getData("/bushro", false, stat);
        String result = new String(resByte);
        System.out.println("更改后的值:" + result);
        System.out.println("版本号变化dversion:" + stat.getVersion());
        countDown.countDown();
      } else if(event.getType() == EventType.NodeCreated) {
      } else if(event.getType() == EventType.NodeChildrenChanged) {
      } else if(event.getType() == EventType.NodeDeleted) {
      } 
    } catch (KeeperException e) { 
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  public ZooKeeper getZookeeper() {
    return zookeeper;
  }
  public void setZookeeper(ZooKeeper zookeeper) {
    this.zookeeper = zookeeper;
  }
}

获取zk子节点列表

public class ZKGetChildrenList implements Watcher {
  private ZooKeeper zookeeper = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  public static final Integer timeout = 5000;
  public ZKGetChildrenList() {}
  public ZKGetChildrenList(String connectString) {
    try {
      zookeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList());
    } catch (IOException e) {
      e.printStackTrace();
      if (zookeeper != null) {
        try {
          zookeeper.close();
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
      }
    }
  }
  private static CountDownLatch countDown = new CountDownLatch(1);
  public static void main(String[] args) throws Exception {
    ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
    /**
     * 参数:
     * path:父节点路径
     * watch:true或者false,注册一个watch事件
     */
//    List<String> strChildList = zkServer.getZookeeper().getChildren("/bushro", true);
//    for (String s : strChildList) {
//      System.out.println(s);
//    }
    // 异步调用
    String ctx = "{'callback':'ChildrenCallback'}";
    //回调函数一个是有返回子节点信息(Children2CallBack),一个没有(ChildrenCallBack)
//    zkServer.getZookeeper().getChildren("/imooc", true, new ChildrenCallBack(), ctx);
    zkServer.getZookeeper().getChildren("/bushro", true, new Children2CallBack(), ctx);
    countDown.await();
  }
  @Override
  public void process(WatchedEvent event) {
    try {
      if(event.getType()==EventType.NodeChildrenChanged){
        System.out.println("NodeChildrenChanged");
        ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
        List<String> strChildList = zkServer.getZookeeper().getChildren(event.getPath(), false);
        for (String s : strChildList) {
          System.out.println("子节点: "+s);
        }
        countDown.countDown();
      } else if(event.getType() == EventType.NodeCreated) {
        System.out.println("NodeCreated");
      } else if(event.getType() == EventType.NodeDataChanged) {
        System.out.println("NodeDataChanged");
      } else if(event.getType() == EventType.NodeDeleted) {
        System.out.println("NodeDeleted");
      } 
    } catch (KeeperException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  public ZooKeeper getZookeeper() {
    return zookeeper;
  }
  public void setZookeeper(ZooKeeper zookeeper) {
    this.zookeeper = zookeeper;
  }
}

回调函数

public class Children2CallBack implements Children2Callback {
  @Override
  public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    for (String s : children) {
      System.out.println(s);
    }
    System.out.println("ChildrenCallback:" + path);
    System.out.println((String)ctx);  
    System.out.println("czxid:"+stat.getCzxid());
    System.out.println("ctime:"+stat.getCtime());
    System.out.println("mzxid:"+stat.getMzxid());
    System.out.println("mtime:"+stat.getMtime());
    System.out.println("pzxid:"+stat.getPzxid());
    System.out.println("cversion:"+stat.getCversion());
    System.out.println("version:"+stat.getVersion());
    System.out.println("ephemeralOwner:"+stat.getEphemeralOwner());
    System.out.println("numChildren:"+stat.getNumChildren());
  }
}

删除子节点后触发的事件

20190721131838665.png

判断节否存在

public class ZKNodeExist implements Watcher {
  private ZooKeeper zookeeper = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  public static final Integer timeout = 5000;
  public ZKNodeExist() {}
  public ZKNodeExist(String connectString) {
    try {
      zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeExist());
    } catch (IOException e) {
      e.printStackTrace();
      if (zookeeper != null) {
        try {
          zookeeper.close();
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
      }
    }
  }
  private static CountDownLatch countDown = new CountDownLatch(1);
  public static void main(String[] args) throws Exception {
    ZKNodeExist zkServer = new ZKNodeExist(zkServerPath);
    /**
     * 参数:
     * path:节点路径
     * watch:watch
     */
    Stat stat = zkServer.getZookeeper().exists("/bushro", true);
    if (stat != null) {
      System.out.println("查询的节点版本为dataVersion:" + stat.getVersion());
    } else {
      System.out.println("该节点不存在...");
    }
    countDown.await();
  }
  @Override
  public void process(WatchedEvent event) {
    if (event.getType() == EventType.NodeCreated) {
      System.out.println("节点创建");
      countDown.countDown();
    } else if (event.getType() == EventType.NodeDataChanged) {
      System.out.println("节点数据改变");
      countDown.countDown();
    } else if (event.getType() == EventType.NodeDeleted) {
      System.out.println("节点删除");
      countDown.countDown();
    }
  }
  public ZooKeeper getZookeeper() {
    return zookeeper;
  }
  public void setZookeeper(ZooKeeper zookeeper) {
    this.zookeeper = zookeeper;
  }
}

Acl相关操作

public class ZKNodeAcl implements Watcher {
  private ZooKeeper zookeeper = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  public static final Integer timeout = 5000;
  public ZKNodeAcl() {}
  public ZKNodeAcl(String connectString) {
    try {
      zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeAcl());
    } catch (IOException e) {
      e.printStackTrace();
      if (zookeeper != null) {
        try {
          zookeeper.close();
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
      }
    }
  }
  public void createZKNode(String path, byte[] data, List<ACL> acls) {
    String result = "";
    try {
      /**
       * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
       * 参数:
       * path:创建的路径
       * data:存储的数据的byte[]
       * acl:控制权限策略
       *      Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
       *      CREATOR_ALL_ACL --> auth:user:password:cdrwa
       * createMode:节点类型, 是一个枚举
       *      PERSISTENT:持久节点
       *      PERSISTENT_SEQUENTIAL:持久顺序节点
       *      EPHEMERAL:临时节点
       *      EPHEMERAL_SEQUENTIAL:临时顺序节点
       */
      result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
      System.out.println("创建节点:\t" + result + "\t成功...");
    } catch (KeeperException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } 
  }
  public static void main(String[] args) throws Exception {
    ZKNodeAcl zkServer = new ZKNodeAcl(zkServerPath);
    /**
     * ======================  创建node start  ======================  
     */
    //acl 任何人都可以访问
    zkServer.createZKNode("/aclbushro", "test".getBytes(), Ids.OPEN_ACL_UNSAFE);
    //(测试1)自定义用户认证访问
    List<ACL> acls = new ArrayList<ACL>();
    Id bushro1 = new Id("digest", AclUtils.getDigestUserPwd("bushro1:123456"));
    Id bushro2 = new Id("digest", AclUtils.getDigestUserPwd("bushro2:123456"));
    //设置用户以及对应的权限
    acls.add(new ACL(Perms.ALL, bushro1));
    acls.add(new ACL(Perms.READ, bushro2));
    acls.add(new ACL(Perms.DELETE | Perms.CREATE, bushro2));
    zkServer.createZKNode("/aclbushro/testdigest", "testdigest".getBytes(), acls);
    //注册过的用户必须通过addAuthInfo才能操作节点,参考命令行 addauth
    zkServer.getZookeeper().addAuthInfo("digest", "bushro1:123456".getBytes());
    zkServer.createZKNode("/aclbushro/testdigest/childtest", "childtest".getBytes(), Ids.CREATOR_ALL_ACL);
    Stat stat = new Stat();
    byte[] data = zkServer.getZookeeper().getData("/aclbushro/testdigest", false, stat);
    System.out.println(new String(data));
    zkServer.getZookeeper().setData("/aclbushro/testdigest", "now".getBytes(), 1);
    //(测试二)ip方式的acl
    List<ACL> aclsIP = new ArrayList<ACL>();
    Id ipId1 = new Id("ip", "192.168.1.6");
    aclsIP.add(new ACL(Perms.ALL, ipId1));
    zkServer.createZKNode("/aclbushro/iptest6", "iptest".getBytes(), aclsIP);
    //验证ip是否有权限
    zkServer.getZookeeper().setData("/aclbushro/iptest6", "now".getBytes(), 1);
    Stat stat = new Stat();
    byte[] data = zkServer.getZookeeper().getData("/aclbushro/iptest6", false, stat);
    System.out.println(new String(data));
    System.out.println(stat.getVersion());
  }
  public ZooKeeper getZookeeper() {
    return zookeeper;
  }
  public void setZookeeper(ZooKeeper zookeeper) {
    this.zookeeper = zookeeper;
  }
  @Override
  public void process(WatchedEvent event) {
  }
}

20190721134148459.png

相关文章
|
3月前
|
JSON Java API
【干货满满】分享京东API接口到手价,用Java语言实现
本示例使用 Java 调用京东开放平台商品价格及优惠信息 API,通过商品详情和促销接口获取到手价(含优惠券、满减等),包含签名生成、HTTP 请求及响应解析逻辑,适用于比价工具、电商系统集成等场景。
|
2月前
|
Ubuntu Java 物联网
Java原生结合MQTTX,完成心跳对话
简介:本文带你用Java结合MQTT协议与EMQX服务器,在Ubuntu上实现两个程序的“隔空传话”。通过搭建消息代理、编写发送/接收代码,让Java应用实现实时通信,附完整源码与调试技巧,轻松掌握物联网通信核心技能。✨
248 2
|
8月前
|
缓存 监控 负载均衡
如何提升 API 性能:来自 Java 和测试开发者的优化建议
本文探讨了如何优化API响应时间,提升用户体验。通过缓存(如Redis/Memcached)、减少数据负载(REST过滤字段或GraphQL精确请求)、负载均衡(Nginx/AWS等工具)、数据压缩(Gzip/Brotli)、限流节流、监控性能(Apipost/New Relic等工具)、升级基础设施、减少第三方依赖、优化数据库查询及采用异步处理等方式,可显著提高API速度。快速响应的API不仅让用户满意,还能增强应用整体性能。
|
3月前
|
JSON Java API
【干货满满】分享拼多多API接口到手价,用Java语言实现
本方案基于 Java 实现调用拼多多开放平台商品详情 API,通过联盟接口获取商品到手价(含拼团折扣与优惠券),包含签名生成、HTTP 请求及响应解析逻辑,适用于电商比价、导购系统集成。
|
3月前
|
JSON Java API
【干货满满】分享淘宝API接口到手价,用Java语言实现
本文介绍了如何使用 Java 调用淘宝开放平台 API 获取商品到手价,涵盖依赖配置、签名生成、HTTP 请求与响应解析等核心实现步骤。
|
4月前
|
JSON JavaScript 前端开发
Python+JAVA+PHP语言,苏宁商品详情API
调用苏宁商品详情API,可通过HTTP/HTTPS发送请求并解析响应数据,支持多种编程语言,如JavaScript、Java、PHP、C#、Ruby等。核心步骤包括构造请求URL、发送GET/POST请求及解析JSON/XML响应。不同语言示例展示了如何获取商品名称与价格等信息,实际使用时请参考苏宁开放平台最新文档以确保兼容性。
|
8月前
|
前端开发 Cloud Native Java
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
|
9月前
|
Linux 网络安全 Docker
尼恩一键开发环境: vagrant+java+springcloud+redis+zookeeper镜像下载(&制作详解)
尼恩提供了一系列文章,旨在帮助开发者轻松搭建一键开发环境,涵盖Java分布式、高并发场景下的多种技术组件安装与配置。内容包括但不限于Windows和CentOS虚拟机的安装与排坑指南、MySQL、Kafka、Redis、Zookeeper等关键组件在Linux环境下的部署教程,并附带详细的视频指导。此外,还特别介绍了Vagrant这一虚拟环境部署工具,
尼恩一键开发环境: vagrant+java+springcloud+redis+zookeeper镜像下载(&制作详解)
|
8月前
|
缓存 安全 Java
《从头开始学java,一天一个知识点》之:字符串处理:String类的核心API
🌱 **《字符串处理:String类的核心API》一分钟速通!** 本文快速介绍Java中String类的3个高频API:`substring`、`indexOf`和`split`,并通过代码示例展示其用法。重点提示:`substring`的结束索引不包含该位置,`split`支持正则表达式。进一步探讨了String不可变性的高效设计原理及企业级编码规范,如避免使用`new String()`、拼接时使用`StringBuilder`等。最后通过互动解密游戏帮助读者巩固知识。 (上一篇:《多维数组与常见操作》 | 下一篇预告:《输入与输出:Scanner与System类》)
235 11
|
9月前
|
数据采集 存储 Java
Java爬虫获取微店店铺所有商品API接口设计与实现
本文介绍如何使用Java设计并实现一个爬虫程序,以获取微店店铺的所有商品信息。通过HttpClient发送HTTP请求,Jsoup解析HTML页面,提取商品名称、价格、图片链接等数据,并将其存储到本地文件或数据库中。文中详细描述了爬虫的设计思路、代码实现及注意事项,包括反爬虫机制、数据合法性和性能优化。此方法可帮助商家了解竞争对手,为消费者提供更全面的商品比较。
下一篇
oss云网关配置