Zookeeper系列(四)——Zookeeper原生JAVA API使用详解

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Zookeeper系列(四)——Zookeeper原生JAVA API使用详解

概述


Zookeeper提供了简单易用的API,我们利用这些API实现添加、删除、修改、查看ZooKeeper的节点,以及实现对这些节点的监听功能。


API介绍


建议大家养成查阅 官方文档 的习惯,因为官方文档是最权威的,而且英文也不难,基本上大家都能看得懂。

本文以v3.8.0版本的zookeeper演示,org.apache.zookeeper.Zookeeper是ZooKeeper客户端的主类,除非另有说明,该类的方法是线程安全的。


ZooKeeper构造方法


可以通过构造方法实例化ZooKeeper对象,同时会和到服务器建立连接,服务器就会为客户端分配一个会话ID。客户端将定期向服务器发送心跳,以保持会话的有效性。

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

  • connectString: Zookeeper服务器的地址,多个地址用逗号分隔
  • sessionTimeout:超时时间
  • watcher:设置默认监听器


ZooKeeper常用方法


ZooKeeper的API提供了同步和异步两种方式。同步方法会阻塞,直到服务器响应。异步方法只是对请求进行排队,以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行时执行,或在出现错误时执行,并带有指示错误的适当返回码。

方法 描述
create(String path, byte[] data, List acl, CreateMode createMode) 同步方式创建节点
create(String path, byte[] data, List acl, CreateMode createMode) 异步方式创建节点
delete(String path, int version) 同步方式删除节点
delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) 异步方式删除节点
exists(String path, boolean watch) 返回指定路径的节点状态信息,如果不存在返回null
getChildren(String path, boolean watch) 返回指定路径的所有子节点状态信息
getData(String path, boolean watch, Stat stat) 返回指定路径的节点数据和状态信息
setData(String path, byte[] data, int version) 给指定路径和版本的节点设置新值,如版本为-1,即给所有版本设置值


环境准备


  1. 引入对应版本的依赖,本文用3.8.0最新版本演示
<!--zookeeper 依赖包-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>
       <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.18.24</version>
          <scope>compile</scope>
      </dependency>
        <!--junit测试依赖-->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.5.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
  1. 我们采用junit的方式演示,junit的常用注解作用如下:
  • @BeforeClass – 表示在类中的任意public static void方法执行之前执行
  • @AfterClass – 表示在类中的任意public static void方法执行之后执行
  • @Before – 表示在任意使用@Test注解标注的public void方法执行之前执行
  • @After– 表示在任意使用@Test注解标注的public void方法执行之后执行
  • @Test – 使用该注解标注的public void方法会表示为一个测试方法


测试案例


创建会话和关闭会话


可以通过Zookeeper类的构造函数创建会话,它有10个重载的构造方法。

1671115891098.jpg

参数 说明
connectString 指定ZooKeeper服务器列表,有英文逗号分隔的host:port字符串组成,如"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。可以指定客户端连上connectString中服务器后的根目录,如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" ,对ZooKeeper的操作都会基于/app/a这个根目录,即创建路径为"/foo/bar"的节点,实际该节点的路径为"/app/a/foo/bar"
sessionTimeout 会话的超时时间,单位毫秒。在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。
watcher ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watch事件通知器。该参数也可以设置为null,表明不需要设置默认的Watch处理器。
  • 客户端和服务端建立会话是异步的。构造方法会在处理完客户端初始化工作后立即返回,在通常情况下,此时并没有真正建立好一个可用的会话,此时在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知后,才算真正建立了会话。
  • 实例化的ZooKeeper客户端对象将从connectString列举的服务器中随机选择一个服务器,并尝试连接到该服务器。如果建立连接失败,将尝试连接另一个服务器(顺序是不确定的,因为列举的服务器是随机洗牌的),直到建立连接。即客户端连接一个服务器失败,将继续尝试,直到会话显式关闭。

代码:

private static final String ZK_ADDR = "10.100.1.14:2181";
    private static final Integer ZK_SESSION_TIMEOUT = 30000;
    private ZooKeeper zooKeeper = null;
    @Before
    public void init() throws IOException, InterruptedException {
        log.info("********************** start zk ..................");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper = new ZooKeeper(ZK_ADDR, ZK_SESSION_TIMEOUT, event -> {
            log.info("触发了事件:[{}]", event);
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }
    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
        log.info("************************ close zk ..................");
    }
  • init方法和close方法是用来创建和关闭zk会话,加了@Before@After注解,它会在每个测试用例前后执行。
  • 由于客户端和服务端建立会话是异步的,因此做一个阻塞操作,防止还没开启就执行后面的操作,在真正打开了客户端之后,发送一个消息,并解掉阻塞。


创建节点


创建节点有同步和异步两种方式。

create( final String path, byte[] data, List<ACL> acl, CreateMode createMode)

说明:

  • 该方法是一个同步创建节点的方法

参数说明:

参数 说明
path znode路径。例如,/path, /app/node
data 存储到znode路径的数据,byte数组,最大1M
acl 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。- ZooDefs.Ids.OPEN_ACL_UNSAFE:表示开放权限,所有用户拥有所有权限
  • ZooDefs.Ids.CREATOR_ALL_ACL:表示使用 auth 权限模式,并且对于满足条件的用户开放所有权限
  • ZooDefs.Ids.READ_ACL_UNSAFE:表示对于所有用户,只开放Read权限
  • ZooDefs.Ids.ANYONE_ID_UNSAFE:是一个常用的Id对象,表示所有用户
  • ZooDefs.Ids.AUTH_IDS:是一个Auth模式的Id对象。
  • 我们也可以自己定义权限模式                                                                                  | | createMode | 节点的类型,是一个枚举。-   PERSISTENT:持久节点(也有叫永久节点的),不会随着会话的结束而自动删除。
  • PERSISTENT_SEQUENTIAL:带单调递增序号的持久节点,不会随着会话的结束而自动删除。
  • EPHEMERAL:临时节点,会随着会话的结束而自动删除。
  • EPHEMERAL_SEQUENTIAL:带单调递增序号的临时节点,会随着会话的结束而自动删除。
  • CONTAINER:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。
  • PERSISTENT_WITH_TTL:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。
  • PERSISTENT_SEQUENTIAL_WITH_TTL:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 |

create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)

说明:

  • 这是一个异步创建节点的方法

参数说明:

其他参数和上面同步创建节点一样.

参数 说明
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testCreate() throws InterruptedException, KeeperException {
        // 创建一个持久节点,对所有用户开放
        zooKeeper.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        // 创建一个临时的有序节点,权限模式为对指定ip开放
        Id ip = new Id("ip", "10.100.1.100");
        zooKeeper.create("/user", "u00001".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip)), CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    // 异步创建节点
    @Test
    public void testCreateAsync() throws InterruptedException {
        zooKeeper.create("/path2", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,  new AsyncCallback.StringCallback(){
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                log.info("rc: [{}]", rc);  // 0代表成功了
                log.info(path);  // 传进来的,添加的节点
                log.info(name);   // 真正查到的节点的名字
                log.info(ctx.toString());  // 上下文参数,ctx传进来的东西
                log.info("create node success!");
            }
        }, "ctx" );
        Thread.sleep(1000);
    }

创建成功的日志

1671115922476.jpg


查看节点


// 同步方式查看节点数据,使用自定义的监听器
byte[] getData(final String path, Watcher watcher, Stat stat)
// 同步方式查看节点数据,使用连接时的监听器    
byte[] getData(String path, boolean watch, Stat stat)
// 异步方式查看节点,使用自定义的监听器
void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
// 异步方式查看节点,使用注册的连接器
void getData(String path, boolean watch, DataCallback cb, Object ctx) 

参数说明:

参数 说明
path znode路径
watcher 注册一个监听器
watch 是否使用连接对象中注册的监视器
stat 返回znode的元数据
callBack 异步回调接口
ctx 传递上下文参数

代码

@Test
    public void testGet() throws InterruptedException, KeeperException {
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData("/node1", false, stat);
        log.info("获取到的数据是:" + new String(data));
        log.info("当前节点的版本:" + stat.getVersion());
    }
    @Test
    public void testGetAsync() throws InterruptedException, KeeperException {
        zooKeeper.getData("/node1", null, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
                log.info("rc: " + rc);
                log.info(path);
                log.info(new String(bytes));
                log.info("version: " + stat.getVersion());
            }
        }, null);
        Thread.sleep(1000);
    }


更新节点


// 同步方式更新节点
Stat setData(final String path, byte[] data, int version)
// 异步方式更新节点
void setData(final String path, byte[] data, int version, StatCallback cb, Object ctx) 

参数说明:

参数 说明
path znode路径
data 更新的数据
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testSetData() throws InterruptedException, KeeperException {
        Stat stat = zooKeeper.setData("/node1", "alvin".getBytes(), -1);  // 返回状态信息
        log.info(stat.toString());  // 将状态信息打印
        log.info("当前版本号" + stat.getVersion());
        log.info("节点创建时间" + stat.getCtime());
        log.info("节点修改时间" + stat.getMtime());
    }
    @Test
    public void testSetDataAsync() throws InterruptedException, KeeperException {
        zooKeeper.setData("/node1", "alvin2".getBytes(), 1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                log.info("rc" + rc);  // 0 代表修改成功
                log.info(path); // 输入的节点路径
                log.info("version " + stat.getVersion()); // 当前版本
            }
        }, null);  // 返回状态信息
        Thread.sleep(1000);
    }


删除节点


// 同步方式删除节点
void delete(final String path, int version)
// 异步方式删除节点
void delete(final String path, int version, VoidCallback cb, Object ctx)

参数说明:

参数 说明
path znode路径
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testDelete() throws InterruptedException, KeeperException {
        zooKeeper.delete("/node1", -1);  // 如果节点不存在,会删除失败
        zooKeeper.delete("/node5/child1", -1);  // 如果节点不存在,会删除失败
    }
    @Test
    public void testDeleteAsync() {
        zooKeeper.delete("/node1", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                log.info("rc:" + rc);
                log.info(path);
            }
        }, "ctx");
    }


查看子节点


//同步方式查看子节点,传入监听器
List<String> getChildren(final String path, Watcher watcher)
//同步方式查看子节点,是否使用默认的监听器    
List<String> getChildren(String path, boolean watch)
//异步方式查看子节点,传入监听器    
void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)  
//异步方式查看子节点,是否使用默认的监听器  
void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)     

代码:

private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    @Test
    public void testGetChild() throws InterruptedException, KeeperException {
        syncCreateNode("/a", "hello");
        syncCreateNode("/a/b", "hello");
        syncCreateNode("/a/c", "hello");
        List<String> children = zooKeeper.getChildren("/a", false);
        log.info("********* children: [{}]", children);
    }
复制代码

结果:

1671115971544.jpg


检查节点是否存在


// 同步方式检查节点是否存在, 传入监听器
 Stat exists(final String path, Watcher watcher)
 // 同步方式检查节点是否存在, 是否用默认监听器
 Stat exists(String path, boolean watch)    
 // 异步方式检查节点是否存在, 传入监听器    
 void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)    
 // 异步方式检查节点是否存在, 是否用默认监听器     
 void exists(String path, boolean watch, StatCallback cb, Object ctx)     

代码:

@Test
    public void testExist() throws InterruptedException, KeeperException {
        syncCreateNode("/alvin", "hello");
        Stat stat = zooKeeper.exists("/alvin", false);
        log.info("stat: [{}]", stat);
        log.info("delete node /alvin ......");
        zooKeeper.delete("/alvin", -1);
        stat = zooKeeper.exists("/alvin", false);
        log.info("stat: [{}]", stat);
    }


监听器代码验证


getData、exist是、getChildren三个方法都可以监听对应节点变化。

1671115992804.jpg


验证watch的一次性


  1. 创建监听执行
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }
    @Test
    public void testGetWatch() throws InterruptedException, KeeperException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        byte[] data = zooKeeper.getData("/watch", true, new Stat());
        log.info("getData: [{}]", new String(data));
        Thread.sleep(100000L);
    }
  1. 多次执行更新节点的操作
// 修改数据
    @Test
    public void testUpdateData() throws InterruptedException, KeeperException {
        // 创建临时节点
        zooKeeper.setData("/watch", "bbb".getBytes(), -1);
        Thread.sleep(10000L);
    }
  1. 查看结果, 日志只打印了一次

1671116008446.jpg


通过自定义监听器多次监听


  1. 通过exists创建自定义监听
@Test
    public void testExists() throws KeeperException, InterruptedException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        // 重复使用,用完再注册一个新的
        Stat stat = zooKeeper.exists("/watch", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case NodeCreated:
                        log.info("{}节点创建了", watchedEvent.getPath());
                        break;
                    case NodeDataChanged:
                        log.info("{}节点数据被修改了", watchedEvent.getPath());
                        break;
                    case NodeDeleted:
                        log.info("{}节点被删除了", watchedEvent.getPath());
                        break;
                }
                try {
                    // 重复监听的关键
                    zooKeeper.exists("/watch", this);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        if (stat != null) {
            log.info("version: " + stat.getVersion());
        }
        Thread.sleep(100000);
    }
  1. 多次执行更新节点、删除节点、创建节点的操作
  2. 查看结果,多次响应监听

1671116022429.jpg


通过addWatcher方法实现多次监听


  1. 通过addWatch添加监听器,addWatch方法支持重复监听
@Test
    public void testAddWatch() throws InterruptedException, KeeperException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        zooKeeper.addWatch("/watch", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                switch (event.getType()) {
                    case NodeCreated:
                        log.info("{}节点创建了", event.getPath());
                        break;
                    case NodeDataChanged:
                        log.info("{}节点数据被修改了", event.getPath());
                        break;
                    case NodeDeleted:
                        log.info("{}节点被删除了", event.getPath());
                        break;
                }
            }
        }, AddWatchMode.PERSISTENT);
        Thread.sleep(100000);
    }
  1. 多次执行更新节点、删除节点、创建节点的操作
  2. 查看结果

1671116039880.jpg

说明:

  • zk api提供了addWatch、printwatches、removewatches方法,分别用来添加监听,答应监听器和移除监听器列表。
  • addWatch的参数中可以传入监听的两种模式,PERSISTENT和PERSISTENT_RECURSIVE,PERSISTENT模式只监听指定的节点事件,而PERSISTENT_RECURSIVE模式会监听指定节点与它所有子节点的事件。
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
27天前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
62 0
|
1月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
25 0
|
17天前
|
安全 Java API
【性能与安全的双重飞跃】JDK 22外部函数与内存API:JNI的继任者,引领Java新潮流!
【9月更文挑战第7天】JDK 22外部函数与内存API的发布,标志着Java在性能与安全性方面实现了双重飞跃。作为JNI的继任者,这一新特性不仅简化了Java与本地代码的交互过程,还提升了程序的性能和安全性。我们有理由相信,在外部函数与内存API的引领下,Java将开启一个全新的编程时代,为开发者们带来更加高效、更加安全的编程体验。让我们共同期待Java在未来的辉煌成就!
45 11
|
25天前
|
Java API
Java 8新特性:Lambda表达式与Stream API的深度解析
【7月更文挑战第61天】本文将深入探讨Java 8中的两个重要特性:Lambda表达式和Stream API。我们将首先介绍Lambda表达式的基本概念和语法,然后详细解析Stream API的使用和优势。最后,我们将通过实例代码演示如何结合使用Lambda表达式和Stream API,以提高Java编程的效率和可读性。
|
6天前
|
Kubernetes Cloud Native Java
探索未来编程新纪元:Quarkus带你秒建高性能Kubernetes原生Java应用,云原生时代的技术狂欢!
Quarkus 是专为 Kubernetes 设计的全栈云原生 Java 框架,凭借其轻量级、快速启动及高效执行特性,在 Java 社区脱颖而出。通过编译时优化与原生镜像支持,Quarkus 提升了应用性能,同时保持了 Java 的熟悉度与灵活性。本文将指导你从创建项目、编写 REST 控制器到构建与部署 Kubernetes 原生镜像的全过程,让你快速上手 Quarkus,体验高效开发与部署的乐趣。
11 0
|
1月前
|
Java API 开发者
|
24天前
|
存储 JavaScript 前端开发
探索React状态管理:Redux的严格与功能、MobX的简洁与直观、Context API的原生与易用——详细对比及应用案例分析
【8月更文挑战第31天】在React开发中,状态管理对于构建大型应用至关重要。本文将探讨三种主流状态管理方案:Redux、MobX和Context API。Redux采用单一存储模型,提供预测性状态更新;MobX利用装饰器语法,使状态修改更直观;Context API则允许跨组件状态共享,无需第三方库。每种方案各具特色,适用于不同场景,选择合适的工具能让React应用更加高效有序。
37 0
|
1月前
|
JSON Java API
【Azure API 管理】通过Java APIM SDK创建一个新的API,如何为Reqeust的Representation设置一个内容示例(Sample)?
【Azure API 管理】通过Java APIM SDK创建一个新的API,如何为Reqeust的Representation设置一个内容示例(Sample)?
|
1月前
|
Java API 开发者
针对Java开发者的RESTful API设计与实现指南
本文是一份针对Java开发者的RESTful API设计与实现指南。RESTful API采用表述性状态转移(REST)架构风格,提供无状态、统一接口的服务。在Java中,可通过Spring Boot框架快速构建RESTful API,利用Spring MVC处理HTTP请求,并支持数据绑定、验证及异常处理等功能。此外,还介绍了版本控制、安全性加强、文档编写与测试等最佳实践,帮助开发者打造高性能且可靠的API服务。
26 0
|
1月前
|
Java API
Java8 Lambda 设计和实现问题之在Java 8的Stream API中,parallel=false时collect方法是如何实现的
Java8 Lambda 设计和实现问题之在Java 8的Stream API中,parallel=false时collect方法是如何实现的