ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)

what

Curator(监护人;管理者) 是 Netflix 公司开源的一个 Zookeeper 客户端,目前由 Apache 进行维护。与 Zookeeper 原生客户端相比,Curator 的抽象层次更高,功能也更加丰富,是目前 Zookeeper 使用范围最广的 Java客户端。

use

  1. 依赖
    <dependencies>
    <!--Curator 相关依赖-->
    <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>4.0.0</version>
    </dependency>
    <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>4.0.0</version>
    </dependency>
    <dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.13</version>
    </dependency>
    <!--单元测试相关依赖-->
    <dependency>
     <groupId>junit</groupId>
     <artifactId>junit</artifactId>
     <version>4.12</version>
    </dependency>
    </dependencies>
    
  2. 基础API测试类 BasicOperationTest.java
public class BasicOperationTest {
   
    private CuratorFramework client = null;
    /**
     * zookeeper服务器地址
     */
    private static final String ZK_SERVER_PATH = "xxx.xx.xxx.xxx:2181";
    private static final String NODE_PATH = "/hadoop/yarn";

    @Before
    public void prepare() {
   
        // 重试策略
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(ZK_SERVER_PATH)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        //指定命名空间后,client 的所有路径操作都会以 / workspace 开头
        client.start();
    }

    /**
     * 判断服务状态
     */
    @Test
    public void getStatus() {
   
        CuratorFrameworkState state = client.getState();
        System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
    }

    /**
     * 创建节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void createNodes() throws Exception {
   
        byte[] data = "abc".getBytes();
        client.create().creatingParentsIfNeeded()
                //节点类型
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(NODE_PATH, data);
    }

    /**
     * 获取节点信息
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void getNode() throws Exception {
   
        Stat stat = new Stat();
        byte[] data = client.getData().storingStatIn(stat).forPath(NODE_PATH);
        System.out.println("节点数据:" + new String(data));
        System.out.println("节点信息:" + stat.toString());
    }

    /**
     * 获取子节点列表
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void getChildrenNodes() throws Exception {
   
        List<String> childNodes = client.getChildren().forPath("/hadoop");
        for (String s : childNodes) {
   
            System.out.println(s);
        }
    }

    /**
     * 更新节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void updateNode() throws Exception {
   
        byte[] newData = "defg".getBytes();
        // 传入版本号,如果版本号错误则拒绝更新操作,并抛出 BadVersion 异常
        client.setData().withVersion(0)
                .forPath(NODE_PATH, newData);
    }

    /**
     * 删除节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void deleteNodes() throws Exception {
   
        client.delete()
                // 如果删除失败,那么在会继续执行,直到成功
                .guaranteed()
                // 如果有子节点,则递归删除
                .deletingChildrenIfNeeded()
                // 传入版本号,如果版本号错误则拒绝删除操作,并抛出 BadVersion 异常
                .withVersion(0)
                .forPath(NODE_PATH);
    }

    /**
     * 判断节点是否存在
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void existNode() throws Exception {
   
        // 如果节点存在则返回其状态信息如果不存在则为 null
        Stat stat = client.checkExists().forPath(NODE_PATH + "aa/bb/cc");
        System.out.println("节点是否存在:" + (stat != null));
    }

    /**
     * 创建一次监听
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void DisposableWatch() throws Exception {
   
        client.getData().usingWatcher(new CuratorWatcher() {
   
            @Override
            public void process(WatchedEvent event) {
   
                System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
            }
        }).forPath(NODE_PATH);
        //休眠以观察测试效果
        Thread.sleep(1000 * 1000);
    }

    /**
     * 创建永久监听
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void permanentWatch() throws Exception {
   
        // 使用 NodeCache 包装节点,对其注册的监听作用于节点,且是永久性的
        final NodeCache nodeCache = new NodeCache(client, NODE_PATH);
        // 通常设置为 true, 代表创建 nodeCache 时,就去获取对应节点的值并缓存
        nodeCache.start(true);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
   
            @Override
            public void nodeChanged() {
   
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null) {
   
                    System.out.println("节点路径:" + currentData.getPath() +
                            "数据:" + new String(currentData.getData()));
                }
            }
        });
        // 休眠以观察测试效果
        Thread.sleep(1000 * 1000);
    }

    /**
     * 监听字节点
     *
     * @throws Exception 可能出现异常
     */
    @Test
    public void permanentChildrenNodesWatch() throws Exception {
   
        // 第三个参数代表除了节点状态外,是否还缓存节点内容
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop",
                true);
        /*
         * StartMode 代表初始化方式:
         * NORMAL: 异步初始化
         * BUILD_INITIAL_CACHE: 同步初始化
         * POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发 INITIALIZED 事件
         */
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("当前数据节点的子节点列表:");
        childDataList.forEach(x -> System.out.println(x.getPath()));
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
   
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent
                    event) {
   
                switch (event.getType()) {
   
                    case INITIALIZED:
                        System.out.println("childrenCache 初始化完成");
                        break;
                    case CHILD_ADDED:
                        // 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入 childrenCache 缓存中
                        System.out.println("增加子节点:" + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("删除子节点:" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("被修改的子节点的路径:" +
                                event.getData().getPath());
                        System.out.println("修改后的数据:" + new
                                String(event.getData().getData()));
                        break;
                    default:
                        System.out.println("无匹配!");
                }
            }
        });
        //休眠以观察测试效果
        Thread.sleep(1000 * 1000);
    }

    @After
    public void destroy() {
   
        if (client != null) {
   
            client.close();
        }
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
8天前
|
消息中间件 算法 Java
客户端限流器和服务端API限流器的区别
客户端限流器和服务端API限流器在限流对象、实现方式以及应用场景等方面存在显著差异。客户端限流器主要关注于保护客户端资源和控制客户端行为,而服务端API限流器则更注重于保护服务端系统和确保服务的高可用性。
21 3
|
9天前
|
存储 缓存 监控
Java一分钟之-Apache Ignite:分布式内存计算平台
【5月更文挑战第21天】Apache Ignite是一款开源的分布式内存计算平台,涉及内存数据网格、流处理和计算服务。本文关注其常见问题,如数据丢失、分区不均、内存管理和网络延迟。为保证数据一致性,建议使用适当的數據模式和备份策略,实现数据持久化。优化内存配置和监控网络可提升性能与稳定性。提供的Java代码示例展示了如何创建分区缓存并设置备份。正确配置和管理Ignite是构建高可用、高性能应用的关键,持续监控集群状态至关重要。
26 0
|
9天前
|
缓存 监控 Java
Java一分钟之-Apache Geode:分布式内存数据平台
【5月更文挑战第21天】Apache Geode是低延迟的分布式内存数据平台,用于构建实时应用,提供缓存、数据库和消息传递功能。本文聚焦于Geode的常见问题,如数据一致性(数据同步延迟和分区冲突)和性能瓶颈(网络延迟和资源管理不当),并提出解决方案。确保数据一致性可通过选择合适的数据策略和利用`InterestPolicy`、`CacheListener`;提升性能则需优化网络和合理配置资源。通过示例代码展示了如何创建和操作Geode的Region。正确配置和调优Geode对于实现高可用、高性能应用至关重要。
27 1
|
15天前
|
网络协议 Dubbo Java
【网络编程】理解客户端和服务器并使用Java提供的api实现回显服务器
【网络编程】理解客户端和服务器并使用Java提供的api实现回显服务器
15 0
|
15天前
|
数据采集 机器学习/深度学习 Java
数据猎手:使用Java和Apache HttpComponents库下载Facebook图像
本文介绍了如何使用Java和Apache HttpComponents库从Facebook获取图像数据。通过设置爬虫代理IP以避免限制,利用HttpClient发送请求,解析HTML找到图像链接,然后下载并保存图片。提供的Java代码示例展示了实现过程,包括创建代理配置、线程池,以及下载图片的逻辑。注意,实际应用需根据Facebook页面结构进行调整。
数据猎手:使用Java和Apache HttpComponents库下载Facebook图像
|
15天前
|
JSON 测试技术 API
Python的Api自动化测试使用HTTP客户端库发送请求
【4月更文挑战第18天】在Python中进行HTTP请求和API自动化测试有多个库可选:1) `requests`是最流行的选择,支持多种请求方法和内置JSON解析;2) `http.client`是标准库的一部分,适合需要低级别控制的用户;3) `urllib`提供URL操作,适用于复杂请求;4) `httpx`拥有类似`requests`的API,提供现代特性和异步支持。根据具体需求选择,如多数情况`requests`已足够。
20 3
|
7天前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7天前
|
消息中间件 关系型数据库 MySQL
Apache Flink CDC 3.1.0 发布公告
Apache Flink 社区很高兴地宣布发布 Flink CDC 3.1.0!
316 1
Apache Flink CDC 3.1.0 发布公告
|
7天前
|
Java 关系型数据库 数据库连接
实时计算 Flink版操作报错之遇到错误org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc',该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
11天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
78 0

推荐镜像

更多