Zookeeper系列(四)——Zookeeper原生JAVA API使用详解

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Zookeeper系列(四)——Zookeeper原生JAVA API使用详解

概述


Zookeeper提供了简单易用的API,我们利用这些API实现添加、删除、修改、查看ZooKeeper的节点,以及实现对这些节点的监听功能。


API介绍


建议大家养成查阅 官方文档 的习惯,因为官方文档是最权威的,而且英文也不难,基本上大家都能看得懂。

本文以v3.8.0版本的zookeeper演示,org.apache.zookeeper.Zookeeper是ZooKeeper客户端的主类,除非另有说明,该类的方法是线程安全的。


ZooKeeper构造方法


可以通过构造方法实例化ZooKeeper对象,同时会和到服务器建立连接,服务器就会为客户端分配一个会话ID。客户端将定期向服务器发送心跳,以保持会话的有效性。

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

  • connectString: Zookeeper服务器的地址,多个地址用逗号分隔
  • sessionTimeout:超时时间
  • watcher:设置默认监听器


ZooKeeper常用方法


ZooKeeper的API提供了同步和异步两种方式。同步方法会阻塞,直到服务器响应。异步方法只是对请求进行排队,以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行时执行,或在出现错误时执行,并带有指示错误的适当返回码。

方法 描述
create(String path, byte[] data, List acl, CreateMode createMode) 同步方式创建节点
create(String path, byte[] data, List acl, CreateMode createMode) 异步方式创建节点
delete(String path, int version) 同步方式删除节点
delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) 异步方式删除节点
exists(String path, boolean watch) 返回指定路径的节点状态信息,如果不存在返回null
getChildren(String path, boolean watch) 返回指定路径的所有子节点状态信息
getData(String path, boolean watch, Stat stat) 返回指定路径的节点数据和状态信息
setData(String path, byte[] data, int version) 给指定路径和版本的节点设置新值,如版本为-1,即给所有版本设置值


环境准备


  1. 引入对应版本的依赖,本文用3.8.0最新版本演示
<!--zookeeper 依赖包-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>
       <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.18.24</version>
          <scope>compile</scope>
      </dependency>
        <!--junit测试依赖-->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.5.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
  1. 我们采用junit的方式演示,junit的常用注解作用如下:
  • @BeforeClass – 表示在类中的任意public static void方法执行之前执行
  • @AfterClass – 表示在类中的任意public static void方法执行之后执行
  • @Before – 表示在任意使用@Test注解标注的public void方法执行之前执行
  • @After– 表示在任意使用@Test注解标注的public void方法执行之后执行
  • @Test – 使用该注解标注的public void方法会表示为一个测试方法


测试案例


创建会话和关闭会话


可以通过Zookeeper类的构造函数创建会话,它有10个重载的构造方法。

1671115891098.jpg

参数 说明
connectString 指定ZooKeeper服务器列表,有英文逗号分隔的host:port字符串组成,如"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。可以指定客户端连上connectString中服务器后的根目录,如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" ,对ZooKeeper的操作都会基于/app/a这个根目录,即创建路径为"/foo/bar"的节点,实际该节点的路径为"/app/a/foo/bar"
sessionTimeout 会话的超时时间,单位毫秒。在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。
watcher ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watch事件通知器。该参数也可以设置为null,表明不需要设置默认的Watch处理器。
  • 客户端和服务端建立会话是异步的。构造方法会在处理完客户端初始化工作后立即返回,在通常情况下,此时并没有真正建立好一个可用的会话,此时在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知后,才算真正建立了会话。
  • 实例化的ZooKeeper客户端对象将从connectString列举的服务器中随机选择一个服务器,并尝试连接到该服务器。如果建立连接失败,将尝试连接另一个服务器(顺序是不确定的,因为列举的服务器是随机洗牌的),直到建立连接。即客户端连接一个服务器失败,将继续尝试,直到会话显式关闭。

代码:

private static final String ZK_ADDR = "10.100.1.14:2181";
    private static final Integer ZK_SESSION_TIMEOUT = 30000;
    private ZooKeeper zooKeeper = null;
    @Before
    public void init() throws IOException, InterruptedException {
        log.info("********************** start zk ..................");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper = new ZooKeeper(ZK_ADDR, ZK_SESSION_TIMEOUT, event -> {
            log.info("触发了事件:[{}]", event);
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }
    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
        log.info("************************ close zk ..................");
    }
  • init方法和close方法是用来创建和关闭zk会话,加了@Before@After注解,它会在每个测试用例前后执行。
  • 由于客户端和服务端建立会话是异步的,因此做一个阻塞操作,防止还没开启就执行后面的操作,在真正打开了客户端之后,发送一个消息,并解掉阻塞。


创建节点


创建节点有同步和异步两种方式。

create( final String path, byte[] data, List<ACL> acl, CreateMode createMode)

说明:

  • 该方法是一个同步创建节点的方法

参数说明:

参数 说明
path znode路径。例如,/path, /app/node
data 存储到znode路径的数据,byte数组,最大1M
acl 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。- ZooDefs.Ids.OPEN_ACL_UNSAFE:表示开放权限,所有用户拥有所有权限
  • ZooDefs.Ids.CREATOR_ALL_ACL:表示使用 auth 权限模式,并且对于满足条件的用户开放所有权限
  • ZooDefs.Ids.READ_ACL_UNSAFE:表示对于所有用户,只开放Read权限
  • ZooDefs.Ids.ANYONE_ID_UNSAFE:是一个常用的Id对象,表示所有用户
  • ZooDefs.Ids.AUTH_IDS:是一个Auth模式的Id对象。
  • 我们也可以自己定义权限模式                                                                                  | | createMode | 节点的类型,是一个枚举。-   PERSISTENT:持久节点(也有叫永久节点的),不会随着会话的结束而自动删除。
  • PERSISTENT_SEQUENTIAL:带单调递增序号的持久节点,不会随着会话的结束而自动删除。
  • EPHEMERAL:临时节点,会随着会话的结束而自动删除。
  • EPHEMERAL_SEQUENTIAL:带单调递增序号的临时节点,会随着会话的结束而自动删除。
  • CONTAINER:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。
  • PERSISTENT_WITH_TTL:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。
  • PERSISTENT_SEQUENTIAL_WITH_TTL:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 |

create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)

说明:

  • 这是一个异步创建节点的方法

参数说明:

其他参数和上面同步创建节点一样.

参数 说明
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testCreate() throws InterruptedException, KeeperException {
        // 创建一个持久节点,对所有用户开放
        zooKeeper.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        // 创建一个临时的有序节点,权限模式为对指定ip开放
        Id ip = new Id("ip", "10.100.1.100");
        zooKeeper.create("/user", "u00001".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip)), CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    // 异步创建节点
    @Test
    public void testCreateAsync() throws InterruptedException {
        zooKeeper.create("/path2", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,  new AsyncCallback.StringCallback(){
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                log.info("rc: [{}]", rc);  // 0代表成功了
                log.info(path);  // 传进来的,添加的节点
                log.info(name);   // 真正查到的节点的名字
                log.info(ctx.toString());  // 上下文参数,ctx传进来的东西
                log.info("create node success!");
            }
        }, "ctx" );
        Thread.sleep(1000);
    }

创建成功的日志

1671115922476.jpg


查看节点


// 同步方式查看节点数据,使用自定义的监听器
byte[] getData(final String path, Watcher watcher, Stat stat)
// 同步方式查看节点数据,使用连接时的监听器    
byte[] getData(String path, boolean watch, Stat stat)
// 异步方式查看节点,使用自定义的监听器
void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
// 异步方式查看节点,使用注册的连接器
void getData(String path, boolean watch, DataCallback cb, Object ctx) 

参数说明:

参数 说明
path znode路径
watcher 注册一个监听器
watch 是否使用连接对象中注册的监视器
stat 返回znode的元数据
callBack 异步回调接口
ctx 传递上下文参数

代码

@Test
    public void testGet() throws InterruptedException, KeeperException {
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData("/node1", false, stat);
        log.info("获取到的数据是:" + new String(data));
        log.info("当前节点的版本:" + stat.getVersion());
    }
    @Test
    public void testGetAsync() throws InterruptedException, KeeperException {
        zooKeeper.getData("/node1", null, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
                log.info("rc: " + rc);
                log.info(path);
                log.info(new String(bytes));
                log.info("version: " + stat.getVersion());
            }
        }, null);
        Thread.sleep(1000);
    }


更新节点


// 同步方式更新节点
Stat setData(final String path, byte[] data, int version)
// 异步方式更新节点
void setData(final String path, byte[] data, int version, StatCallback cb, Object ctx) 

参数说明:

参数 说明
path znode路径
data 更新的数据
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testSetData() throws InterruptedException, KeeperException {
        Stat stat = zooKeeper.setData("/node1", "alvin".getBytes(), -1);  // 返回状态信息
        log.info(stat.toString());  // 将状态信息打印
        log.info("当前版本号" + stat.getVersion());
        log.info("节点创建时间" + stat.getCtime());
        log.info("节点修改时间" + stat.getMtime());
    }
    @Test
    public void testSetDataAsync() throws InterruptedException, KeeperException {
        zooKeeper.setData("/node1", "alvin2".getBytes(), 1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                log.info("rc" + rc);  // 0 代表修改成功
                log.info(path); // 输入的节点路径
                log.info("version " + stat.getVersion()); // 当前版本
            }
        }, null);  // 返回状态信息
        Thread.sleep(1000);
    }


删除节点


// 同步方式删除节点
void delete(final String path, int version)
// 异步方式删除节点
void delete(final String path, int version, VoidCallback cb, Object ctx)

参数说明:

参数 说明
path znode路径
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testDelete() throws InterruptedException, KeeperException {
        zooKeeper.delete("/node1", -1);  // 如果节点不存在,会删除失败
        zooKeeper.delete("/node5/child1", -1);  // 如果节点不存在,会删除失败
    }
    @Test
    public void testDeleteAsync() {
        zooKeeper.delete("/node1", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                log.info("rc:" + rc);
                log.info(path);
            }
        }, "ctx");
    }


查看子节点


//同步方式查看子节点,传入监听器
List<String> getChildren(final String path, Watcher watcher)
//同步方式查看子节点,是否使用默认的监听器    
List<String> getChildren(String path, boolean watch)
//异步方式查看子节点,传入监听器    
void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)  
//异步方式查看子节点,是否使用默认的监听器  
void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)     

代码:

private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    @Test
    public void testGetChild() throws InterruptedException, KeeperException {
        syncCreateNode("/a", "hello");
        syncCreateNode("/a/b", "hello");
        syncCreateNode("/a/c", "hello");
        List<String> children = zooKeeper.getChildren("/a", false);
        log.info("********* children: [{}]", children);
    }
复制代码

结果:

1671115971544.jpg


检查节点是否存在


// 同步方式检查节点是否存在, 传入监听器
 Stat exists(final String path, Watcher watcher)
 // 同步方式检查节点是否存在, 是否用默认监听器
 Stat exists(String path, boolean watch)    
 // 异步方式检查节点是否存在, 传入监听器    
 void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)    
 // 异步方式检查节点是否存在, 是否用默认监听器     
 void exists(String path, boolean watch, StatCallback cb, Object ctx)     

代码:

@Test
    public void testExist() throws InterruptedException, KeeperException {
        syncCreateNode("/alvin", "hello");
        Stat stat = zooKeeper.exists("/alvin", false);
        log.info("stat: [{}]", stat);
        log.info("delete node /alvin ......");
        zooKeeper.delete("/alvin", -1);
        stat = zooKeeper.exists("/alvin", false);
        log.info("stat: [{}]", stat);
    }


监听器代码验证


getData、exist是、getChildren三个方法都可以监听对应节点变化。

1671115992804.jpg


验证watch的一次性


  1. 创建监听执行
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }
    @Test
    public void testGetWatch() throws InterruptedException, KeeperException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        byte[] data = zooKeeper.getData("/watch", true, new Stat());
        log.info("getData: [{}]", new String(data));
        Thread.sleep(100000L);
    }
  1. 多次执行更新节点的操作
// 修改数据
    @Test
    public void testUpdateData() throws InterruptedException, KeeperException {
        // 创建临时节点
        zooKeeper.setData("/watch", "bbb".getBytes(), -1);
        Thread.sleep(10000L);
    }
  1. 查看结果, 日志只打印了一次

1671116008446.jpg


通过自定义监听器多次监听


  1. 通过exists创建自定义监听
@Test
    public void testExists() throws KeeperException, InterruptedException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        // 重复使用,用完再注册一个新的
        Stat stat = zooKeeper.exists("/watch", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case NodeCreated:
                        log.info("{}节点创建了", watchedEvent.getPath());
                        break;
                    case NodeDataChanged:
                        log.info("{}节点数据被修改了", watchedEvent.getPath());
                        break;
                    case NodeDeleted:
                        log.info("{}节点被删除了", watchedEvent.getPath());
                        break;
                }
                try {
                    // 重复监听的关键
                    zooKeeper.exists("/watch", this);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        if (stat != null) {
            log.info("version: " + stat.getVersion());
        }
        Thread.sleep(100000);
    }
  1. 多次执行更新节点、删除节点、创建节点的操作
  2. 查看结果,多次响应监听

1671116022429.jpg


通过addWatcher方法实现多次监听


  1. 通过addWatch添加监听器,addWatch方法支持重复监听
@Test
    public void testAddWatch() throws InterruptedException, KeeperException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        zooKeeper.addWatch("/watch", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                switch (event.getType()) {
                    case NodeCreated:
                        log.info("{}节点创建了", event.getPath());
                        break;
                    case NodeDataChanged:
                        log.info("{}节点数据被修改了", event.getPath());
                        break;
                    case NodeDeleted:
                        log.info("{}节点被删除了", event.getPath());
                        break;
                }
            }
        }, AddWatchMode.PERSISTENT);
        Thread.sleep(100000);
    }
  1. 多次执行更新节点、删除节点、创建节点的操作
  2. 查看结果

1671116039880.jpg

说明:

  • zk api提供了addWatch、printwatches、removewatches方法,分别用来添加监听,答应监听器和移除监听器列表。
  • addWatch的参数中可以传入监听的两种模式,PERSISTENT和PERSISTENT_RECURSIVE,PERSISTENT模式只监听指定的节点事件,而PERSISTENT_RECURSIVE模式会监听指定节点与它所有子节点的事件。
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
96 2
|
3月前
|
Cloud Native API
微服务引擎 MSE 及云原生 API 网关 2024 年 9 月产品动态
微服务引擎 MSE 及云原生 API 网关 2024 年 9 月产品动态。
|
3月前
|
Java
让星星⭐月亮告诉你,自定义定时器和Java自带原生定时器
定时器是一种可以设置多个具有不同执行时间和间隔的任务的工具。本文介绍了定时器的基本概念、如何自定义实现一个定时器,以及Java原生定时器的使用方法,包括定义定时任务接口、实现任务、定义任务处理线程和使用Java的`Timer`与`TimerTask`类来管理和执行定时任务。
69 3
|
17天前
|
JSON Java Apache
Java基础-常用API-Object类
继承是面向对象编程的重要特性,允许从已有类派生新类。Java采用单继承机制,默认所有类继承自Object类。Object类提供了多个常用方法,如`clone()`用于复制对象,`equals()`判断对象是否相等,`hashCode()`计算哈希码,`toString()`返回对象的字符串表示,`wait()`、`notify()`和`notifyAll()`用于线程同步,`finalize()`在对象被垃圾回收时调用。掌握这些方法有助于更好地理解和使用Java中的对象行为。
|
30天前
|
Cloud Native API 微服务
微服务引擎 MSE 及云原生 API 网关 2024 年 11 月产品动态
微服务引擎 MSE 及云原生 API 网关 2024 年 11 月产品动态。
|
1月前
|
算法 Java API
如何使用Java开发获得淘宝商品描述API接口?
本文详细介绍如何使用Java开发调用淘宝商品描述API接口,涵盖从注册淘宝开放平台账号、阅读平台规则、创建应用并申请接口权限,到安装开发工具、配置开发环境、获取访问令牌,以及具体的Java代码实现和注意事项。通过遵循这些步骤,开发者可以高效地获取商品详情、描述及图片等信息,为项目和业务增添价值。
65 10
|
1月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 11 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
1月前
|
Java API 开发者
Java中的Lambda表达式与Stream API的协同作用
在本文中,我们将探讨Java 8引入的Lambda表达式和Stream API如何改变我们处理集合和数组的方式。Lambda表达式提供了一种简洁的方法来表达代码块,而Stream API则允许我们对数据流进行高级操作,如过滤、映射和归约。通过结合使用这两种技术,我们可以以声明式的方式编写更简洁、更易于理解和维护的代码。本文将介绍Lambda表达式和Stream API的基本概念,并通过示例展示它们在实际项目中的应用。
|
2月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 10 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
2月前
|
安全 Java API
Java中的Lambda表达式与Stream API的高效结合####
探索Java编程中Lambda表达式与Stream API如何携手并进,提升数据处理效率,实现代码简洁性与功能性的双重飞跃。 ####
33 0
下一篇
开通oss服务