Zookeeper系列(四)——Zookeeper原生JAVA API使用详解

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Zookeeper系列(四)——Zookeeper原生JAVA API使用详解

概述


Zookeeper提供了简单易用的API,我们利用这些API实现添加、删除、修改、查看ZooKeeper的节点,以及实现对这些节点的监听功能。


API介绍


建议大家养成查阅 官方文档 的习惯,因为官方文档是最权威的,而且英文也不难,基本上大家都能看得懂。

本文以v3.8.0版本的zookeeper演示,org.apache.zookeeper.Zookeeper是ZooKeeper客户端的主类,除非另有说明,该类的方法是线程安全的。


ZooKeeper构造方法


可以通过构造方法实例化ZooKeeper对象,同时会和到服务器建立连接,服务器就会为客户端分配一个会话ID。客户端将定期向服务器发送心跳,以保持会话的有效性。

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

  • connectString: Zookeeper服务器的地址,多个地址用逗号分隔
  • sessionTimeout:超时时间
  • watcher:设置默认监听器


ZooKeeper常用方法


ZooKeeper的API提供了同步和异步两种方式。同步方法会阻塞,直到服务器响应。异步方法只是对请求进行排队,以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行时执行,或在出现错误时执行,并带有指示错误的适当返回码。

方法 描述
create(String path, byte[] data, List acl, CreateMode createMode) 同步方式创建节点
create(String path, byte[] data, List acl, CreateMode createMode) 异步方式创建节点
delete(String path, int version) 同步方式删除节点
delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) 异步方式删除节点
exists(String path, boolean watch) 返回指定路径的节点状态信息,如果不存在返回null
getChildren(String path, boolean watch) 返回指定路径的所有子节点状态信息
getData(String path, boolean watch, Stat stat) 返回指定路径的节点数据和状态信息
setData(String path, byte[] data, int version) 给指定路径和版本的节点设置新值,如版本为-1,即给所有版本设置值


环境准备


  1. 引入对应版本的依赖,本文用3.8.0最新版本演示
<!--zookeeper 依赖包-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>
       <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.18.24</version>
          <scope>compile</scope>
      </dependency>
        <!--junit测试依赖-->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.5.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
  1. 我们采用junit的方式演示,junit的常用注解作用如下:
  • @BeforeClass – 表示在类中的任意public static void方法执行之前执行
  • @AfterClass – 表示在类中的任意public static void方法执行之后执行
  • @Before – 表示在任意使用@Test注解标注的public void方法执行之前执行
  • @After– 表示在任意使用@Test注解标注的public void方法执行之后执行
  • @Test – 使用该注解标注的public void方法会表示为一个测试方法


测试案例


创建会话和关闭会话


可以通过Zookeeper类的构造函数创建会话,它有10个重载的构造方法。

1671115891098.jpg

参数 说明
connectString 指定ZooKeeper服务器列表,有英文逗号分隔的host:port字符串组成,如"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。可以指定客户端连上connectString中服务器后的根目录,如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" ,对ZooKeeper的操作都会基于/app/a这个根目录,即创建路径为"/foo/bar"的节点,实际该节点的路径为"/app/a/foo/bar"
sessionTimeout 会话的超时时间,单位毫秒。在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。
watcher ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watch事件通知器。该参数也可以设置为null,表明不需要设置默认的Watch处理器。
  • 客户端和服务端建立会话是异步的。构造方法会在处理完客户端初始化工作后立即返回,在通常情况下,此时并没有真正建立好一个可用的会话,此时在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知后,才算真正建立了会话。
  • 实例化的ZooKeeper客户端对象将从connectString列举的服务器中随机选择一个服务器,并尝试连接到该服务器。如果建立连接失败,将尝试连接另一个服务器(顺序是不确定的,因为列举的服务器是随机洗牌的),直到建立连接。即客户端连接一个服务器失败,将继续尝试,直到会话显式关闭。

代码:

private static final String ZK_ADDR = "10.100.1.14:2181";
    private static final Integer ZK_SESSION_TIMEOUT = 30000;
    private ZooKeeper zooKeeper = null;
    @Before
    public void init() throws IOException, InterruptedException {
        log.info("********************** start zk ..................");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper = new ZooKeeper(ZK_ADDR, ZK_SESSION_TIMEOUT, event -> {
            log.info("触发了事件:[{}]", event);
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }
    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
        log.info("************************ close zk ..................");
    }
  • init方法和close方法是用来创建和关闭zk会话,加了@Before@After注解,它会在每个测试用例前后执行。
  • 由于客户端和服务端建立会话是异步的,因此做一个阻塞操作,防止还没开启就执行后面的操作,在真正打开了客户端之后,发送一个消息,并解掉阻塞。


创建节点


创建节点有同步和异步两种方式。

create( final String path, byte[] data, List<ACL> acl, CreateMode createMode)

说明:

  • 该方法是一个同步创建节点的方法

参数说明:

参数 说明
path znode路径。例如,/path, /app/node
data 存储到znode路径的数据,byte数组,最大1M
acl 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。- ZooDefs.Ids.OPEN_ACL_UNSAFE:表示开放权限,所有用户拥有所有权限
  • ZooDefs.Ids.CREATOR_ALL_ACL:表示使用 auth 权限模式,并且对于满足条件的用户开放所有权限
  • ZooDefs.Ids.READ_ACL_UNSAFE:表示对于所有用户,只开放Read权限
  • ZooDefs.Ids.ANYONE_ID_UNSAFE:是一个常用的Id对象,表示所有用户
  • ZooDefs.Ids.AUTH_IDS:是一个Auth模式的Id对象。
  • 我们也可以自己定义权限模式                                                                                  | | createMode | 节点的类型,是一个枚举。-   PERSISTENT:持久节点(也有叫永久节点的),不会随着会话的结束而自动删除。
  • PERSISTENT_SEQUENTIAL:带单调递增序号的持久节点,不会随着会话的结束而自动删除。
  • EPHEMERAL:临时节点,会随着会话的结束而自动删除。
  • EPHEMERAL_SEQUENTIAL:带单调递增序号的临时节点,会随着会话的结束而自动删除。
  • CONTAINER:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。
  • PERSISTENT_WITH_TTL:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。
  • PERSISTENT_SEQUENTIAL_WITH_TTL:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 |

create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)

说明:

  • 这是一个异步创建节点的方法

参数说明:

其他参数和上面同步创建节点一样.

参数 说明
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testCreate() throws InterruptedException, KeeperException {
        // 创建一个持久节点,对所有用户开放
        zooKeeper.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        // 创建一个临时的有序节点,权限模式为对指定ip开放
        Id ip = new Id("ip", "10.100.1.100");
        zooKeeper.create("/user", "u00001".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip)), CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    // 异步创建节点
    @Test
    public void testCreateAsync() throws InterruptedException {
        zooKeeper.create("/path2", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,  new AsyncCallback.StringCallback(){
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                log.info("rc: [{}]", rc);  // 0代表成功了
                log.info(path);  // 传进来的,添加的节点
                log.info(name);   // 真正查到的节点的名字
                log.info(ctx.toString());  // 上下文参数,ctx传进来的东西
                log.info("create node success!");
            }
        }, "ctx" );
        Thread.sleep(1000);
    }

创建成功的日志

1671115922476.jpg


查看节点


// 同步方式查看节点数据,使用自定义的监听器
byte[] getData(final String path, Watcher watcher, Stat stat)
// 同步方式查看节点数据,使用连接时的监听器    
byte[] getData(String path, boolean watch, Stat stat)
// 异步方式查看节点,使用自定义的监听器
void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
// 异步方式查看节点,使用注册的连接器
void getData(String path, boolean watch, DataCallback cb, Object ctx) 

参数说明:

参数 说明
path znode路径
watcher 注册一个监听器
watch 是否使用连接对象中注册的监视器
stat 返回znode的元数据
callBack 异步回调接口
ctx 传递上下文参数

代码

@Test
    public void testGet() throws InterruptedException, KeeperException {
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData("/node1", false, stat);
        log.info("获取到的数据是:" + new String(data));
        log.info("当前节点的版本:" + stat.getVersion());
    }
    @Test
    public void testGetAsync() throws InterruptedException, KeeperException {
        zooKeeper.getData("/node1", null, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
                log.info("rc: " + rc);
                log.info(path);
                log.info(new String(bytes));
                log.info("version: " + stat.getVersion());
            }
        }, null);
        Thread.sleep(1000);
    }


更新节点


// 同步方式更新节点
Stat setData(final String path, byte[] data, int version)
// 异步方式更新节点
void setData(final String path, byte[] data, int version, StatCallback cb, Object ctx) 

参数说明:

参数 说明
path znode路径
data 更新的数据
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testSetData() throws InterruptedException, KeeperException {
        Stat stat = zooKeeper.setData("/node1", "alvin".getBytes(), -1);  // 返回状态信息
        log.info(stat.toString());  // 将状态信息打印
        log.info("当前版本号" + stat.getVersion());
        log.info("节点创建时间" + stat.getCtime());
        log.info("节点修改时间" + stat.getMtime());
    }
    @Test
    public void testSetDataAsync() throws InterruptedException, KeeperException {
        zooKeeper.setData("/node1", "alvin2".getBytes(), 1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                log.info("rc" + rc);  // 0 代表修改成功
                log.info(path); // 输入的节点路径
                log.info("version " + stat.getVersion()); // 当前版本
            }
        }, null);  // 返回状态信息
        Thread.sleep(1000);
    }


删除节点


// 同步方式删除节点
void delete(final String path, int version)
// 异步方式删除节点
void delete(final String path, int version, VoidCallback cb, Object ctx)

参数说明:

参数 说明
path znode路径
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

@Test
    public void testDelete() throws InterruptedException, KeeperException {
        zooKeeper.delete("/node1", -1);  // 如果节点不存在,会删除失败
        zooKeeper.delete("/node5/child1", -1);  // 如果节点不存在,会删除失败
    }
    @Test
    public void testDeleteAsync() {
        zooKeeper.delete("/node1", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                log.info("rc:" + rc);
                log.info(path);
            }
        }, "ctx");
    }


查看子节点


//同步方式查看子节点,传入监听器
List<String> getChildren(final String path, Watcher watcher)
//同步方式查看子节点,是否使用默认的监听器    
List<String> getChildren(String path, boolean watch)
//异步方式查看子节点,传入监听器    
void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)  
//异步方式查看子节点,是否使用默认的监听器  
void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)     

代码:

private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    @Test
    public void testGetChild() throws InterruptedException, KeeperException {
        syncCreateNode("/a", "hello");
        syncCreateNode("/a/b", "hello");
        syncCreateNode("/a/c", "hello");
        List<String> children = zooKeeper.getChildren("/a", false);
        log.info("********* children: [{}]", children);
    }
复制代码

结果:

1671115971544.jpg


检查节点是否存在


// 同步方式检查节点是否存在, 传入监听器
 Stat exists(final String path, Watcher watcher)
 // 同步方式检查节点是否存在, 是否用默认监听器
 Stat exists(String path, boolean watch)    
 // 异步方式检查节点是否存在, 传入监听器    
 void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)    
 // 异步方式检查节点是否存在, 是否用默认监听器     
 void exists(String path, boolean watch, StatCallback cb, Object ctx)     

代码:

@Test
    public void testExist() throws InterruptedException, KeeperException {
        syncCreateNode("/alvin", "hello");
        Stat stat = zooKeeper.exists("/alvin", false);
        log.info("stat: [{}]", stat);
        log.info("delete node /alvin ......");
        zooKeeper.delete("/alvin", -1);
        stat = zooKeeper.exists("/alvin", false);
        log.info("stat: [{}]", stat);
    }


监听器代码验证


getData、exist是、getChildren三个方法都可以监听对应节点变化。

1671115992804.jpg


验证watch的一次性


  1. 创建监听执行
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }
    @Test
    public void testGetWatch() throws InterruptedException, KeeperException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        byte[] data = zooKeeper.getData("/watch", true, new Stat());
        log.info("getData: [{}]", new String(data));
        Thread.sleep(100000L);
    }
  1. 多次执行更新节点的操作
// 修改数据
    @Test
    public void testUpdateData() throws InterruptedException, KeeperException {
        // 创建临时节点
        zooKeeper.setData("/watch", "bbb".getBytes(), -1);
        Thread.sleep(10000L);
    }
  1. 查看结果, 日志只打印了一次

1671116008446.jpg


通过自定义监听器多次监听


  1. 通过exists创建自定义监听
@Test
    public void testExists() throws KeeperException, InterruptedException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        // 重复使用,用完再注册一个新的
        Stat stat = zooKeeper.exists("/watch", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case NodeCreated:
                        log.info("{}节点创建了", watchedEvent.getPath());
                        break;
                    case NodeDataChanged:
                        log.info("{}节点数据被修改了", watchedEvent.getPath());
                        break;
                    case NodeDeleted:
                        log.info("{}节点被删除了", watchedEvent.getPath());
                        break;
                }
                try {
                    // 重复监听的关键
                    zooKeeper.exists("/watch", this);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        if (stat != null) {
            log.info("version: " + stat.getVersion());
        }
        Thread.sleep(100000);
    }
  1. 多次执行更新节点、删除节点、创建节点的操作
  2. 查看结果,多次响应监听

1671116022429.jpg


通过addWatcher方法实现多次监听


  1. 通过addWatch添加监听器,addWatch方法支持重复监听
@Test
    public void testAddWatch() throws InterruptedException, KeeperException {
        // 创建临时节点
        syncCreateNode("/watch", "aaa");
        zooKeeper.addWatch("/watch", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                switch (event.getType()) {
                    case NodeCreated:
                        log.info("{}节点创建了", event.getPath());
                        break;
                    case NodeDataChanged:
                        log.info("{}节点数据被修改了", event.getPath());
                        break;
                    case NodeDeleted:
                        log.info("{}节点被删除了", event.getPath());
                        break;
                }
            }
        }, AddWatchMode.PERSISTENT);
        Thread.sleep(100000);
    }
  1. 多次执行更新节点、删除节点、创建节点的操作
  2. 查看结果

1671116039880.jpg

说明:

  • zk api提供了addWatch、printwatches、removewatches方法,分别用来添加监听,答应监听器和移除监听器列表。
  • addWatch的参数中可以传入监听的两种模式,PERSISTENT和PERSISTENT_RECURSIVE,PERSISTENT模式只监听指定的节点事件,而PERSISTENT_RECURSIVE模式会监听指定节点与它所有子节点的事件。
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3天前
|
Java API
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
13 2
|
3天前
|
SQL Java API
Java一分钟之-JPA查询:JPQL与Criteria API
【6月更文挑战第14天】本文探讨了Java Persistence API (JPA)中的两种查询方式:JPQL和Criteria API。JPQL是面向对象的SQL,适用于简单查询,而Criteria API则提供类型安全的动态查询构造。文章指出了每种方法的常见问题和避免策略,如混淆实体属性与数据库字段、参数绑定错误、过度复杂化和性能问题。建议开发者根据需求选择适当的方法,并关注查询的可读性、可维护性和性能优化。
18 2
|
4天前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 API 网关 2024 年 05 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
4天前
|
网络协议 JavaScript 前端开发
Java一分钟之-GraalVM Native Image:构建原生可执行文件
【6月更文挑战第13天】GraalVM Native Image是Java开发的创新技术,它将应用编译成独立的原生可执行文件,实现快速启动和低内存消耗,对微服务、桌面应用和嵌入式系统有重大影响。本文讨论了如何使用Native Image,包括常见挑战如反射与动态类加载、静态初始化问题和依赖冲突,并提供了解决方案和代码示例。通过合理规划和利用GraalVM工具,开发者可以克服这些问题,充分利用Native Image提升应用性能。
31 5
|
4天前
|
API
阿里云微服务引擎及 API 网关 2024 年 5 月产品动态
阿里云微服务引擎及 API 网关 2024 年 5 月产品动态。
阿里云微服务引擎及 API 网关 2024 年 5 月产品动态
|
5天前
|
分布式计算 自然语言处理 大数据
【大数据】MapReduce JAVA API编程实践及适用场景介绍
【大数据】MapReduce JAVA API编程实践及适用场景介绍
15 0
|
5天前
|
Java 大数据 API
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
33 0
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
|
5天前
|
数据可视化 Java API
【JAVA】javadoc,如何生成标准的JAVA API文档
【JAVA】javadoc,如何生成标准的JAVA API文档
6 0
|
1月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
416 2
|
1月前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
417 0

热门文章

最新文章