ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)

简介: ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)

what

Curator(监护人;管理者) 是 Netflix 公司开源的一个 Zookeeper 客户端,目前由 Apache 进行维护。与 Zookeeper 原生客户端相比,Curator 的抽象层次更高,功能也更加丰富,是目前 Zookeeper 使用范围最广的 Java客户端。

use

  1. 依赖
<dependencies>
  <!--Curator 相关依赖-->
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.13</version>
  </dependency>
  <!--单元测试相关依赖-->
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
  </dependency>
</dependencies>
  1. 基础API测试类 BasicOperationTest.java
public class BasicOperationTest {
    private CuratorFramework client = null;
    /**
     * zookeeper服务器地址
     */
    private static final String ZK_SERVER_PATH = "xxx.xx.xxx.xxx:2181";
    private static final String NODE_PATH = "/hadoop/yarn";
    @Before
    public void prepare() {
        // 重试策略
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(ZK_SERVER_PATH)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        //指定命名空间后,client 的所有路径操作都会以 / workspace 开头
        client.start();
    }
    /**
     * 判断服务状态
     */
    @Test
    public void getStatus() {
        CuratorFrameworkState state = client.getState();
        System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
    }
    /**
     * 创建节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void createNodes() throws Exception {
        byte[] data = "abc".getBytes();
        client.create().creatingParentsIfNeeded()
                //节点类型
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(NODE_PATH, data);
    }
    /**
     * 获取节点信息
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void getNode() throws Exception {
        Stat stat = new Stat();
        byte[] data = client.getData().storingStatIn(stat).forPath(NODE_PATH);
        System.out.println("节点数据:" + new String(data));
        System.out.println("节点信息:" + stat.toString());
    }
    /**
     * 获取子节点列表
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void getChildrenNodes() throws Exception {
        List<String> childNodes = client.getChildren().forPath("/hadoop");
        for (String s : childNodes) {
            System.out.println(s);
        }
    }
    /**
     * 更新节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void updateNode() throws Exception {
        byte[] newData = "defg".getBytes();
        // 传入版本号,如果版本号错误则拒绝更新操作,并抛出 BadVersion 异常
        client.setData().withVersion(0)
                .forPath(NODE_PATH, newData);
    }
    /**
     * 删除节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void deleteNodes() throws Exception {
        client.delete()
                // 如果删除失败,那么在会继续执行,直到成功
                .guaranteed()
                // 如果有子节点,则递归删除
                .deletingChildrenIfNeeded()
                // 传入版本号,如果版本号错误则拒绝删除操作,并抛出 BadVersion 异常
                .withVersion(0)
                .forPath(NODE_PATH);
    }
    /**
     * 判断节点是否存在
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void existNode() throws Exception {
        // 如果节点存在则返回其状态信息如果不存在则为 null
        Stat stat = client.checkExists().forPath(NODE_PATH + "aa/bb/cc");
        System.out.println("节点是否存在:" + (stat != null));
    }
    /**
     * 创建一次监听
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void DisposableWatch() throws Exception {
        client.getData().usingWatcher(new CuratorWatcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
            }
        }).forPath(NODE_PATH);
        //休眠以观察测试效果
        Thread.sleep(1000 * 1000);
    }
    /**
     * 创建永久监听
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void permanentWatch() throws Exception {
        // 使用 NodeCache 包装节点,对其注册的监听作用于节点,且是永久性的
        final NodeCache nodeCache = new NodeCache(client, NODE_PATH);
        // 通常设置为 true, 代表创建 nodeCache 时,就去获取对应节点的值并缓存
        nodeCache.start(true);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null) {
                    System.out.println("节点路径:" + currentData.getPath() +
                            "数据:" + new String(currentData.getData()));
                }
            }
        });
        // 休眠以观察测试效果
        Thread.sleep(1000 * 1000);
    }
    /**
     * 监听字节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void permanentChildrenNodesWatch() throws Exception {
        // 第三个参数代表除了节点状态外,是否还缓存节点内容
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop",
                true);
        /*
         * StartMode 代表初始化方式:
         * NORMAL: 异步初始化
         * BUILD_INITIAL_CACHE: 同步初始化
         * POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发 INITIALIZED 事件
         */
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("当前数据节点的子节点列表:");
        childDataList.forEach(x -> System.out.println(x.getPath()));
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent
                    event) {
                switch (event.getType()) {
                    case INITIALIZED:
                        System.out.println("childrenCache 初始化完成");
                        break;
                    case CHILD_ADDED:
                        // 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入 childrenCache 缓存中
                        System.out.println("增加子节点:" + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("删除子节点:" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("被修改的子节点的路径:" +
                                event.getData().getPath());
                        System.out.println("修改后的数据:" + new
                                String(event.getData().getData()));
                        break;
                    default:
                        System.out.println("无匹配!");
                }
            }
        });
        //休眠以观察测试效果
        Thread.sleep(1000 * 1000);
    }
    @After
    public void destroy() {
        if (client != null) {
            client.close();
        }
    }
}
目录
相关文章
|
11月前
|
人工智能 Java API
MCP客户端调用看这一篇就够了(Java版)
本文详细介绍了MCP(Model Context Protocol)客户端的开发方法,包括在没有MCP时的痛点、MCP的作用以及如何通过Spring-AI框架和原生SDK调用MCP服务。文章首先分析了MCP协议的必要性,接着分别讲解了Spring-AI框架和自研SDK的使用方式,涵盖配置LLM接口、工具注入、动态封装工具等步骤,并提供了代码示例。此外,还记录了开发过程中遇到的问题及解决办法,如版本冲突、服务连接超时等。最后,文章探讨了框架与原生SDK的选择,认为框架适合快速构建应用,而原生SDK更适合平台级开发,强调了两者结合使用的价值。
13311 33
MCP客户端调用看这一篇就够了(Java版)
|
11月前
|
存储 网络协议 Java
Java获取客户端IP问题:返回127.0.0.1
总结:要解决Java获取客户端IP返回127.0.0.1的问题,首先要找出原因,再采取合适的解决方案。请参考上述方案来改进代码,确保在各种网络环境下都能正确获取客户端IP地址。希望本文对您有所帮助。
662 25
|
JSON NoSQL Java
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
这篇文章介绍了在Java中使用Redis客户端的几种方法,包括Jedis、SpringDataRedis和SpringBoot整合Redis的操作。文章详细解释了Jedis的基本使用步骤,Jedis连接池的创建和使用,以及在SpringBoot项目中如何配置和使用RedisTemplate和StringRedisTemplate。此外,还探讨了RedisTemplate序列化的两种实践方案,包括默认的JDK序列化和自定义的JSON序列化,以及StringRedisTemplate的使用,它要求键和值都必须是String类型。
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
|
存储 Java API
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
1068 4
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
300 1
|
负载均衡 API 数据安全/隐私保护
Zookeeper的客户端-原生的API
Zookeeper的客户端-原生的API
|
6月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1035 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
532 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
8月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
889 9
Apache Flink:从实时数据分析到实时AI
|
8月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
788 0