Apache ZooKeeper - 使用Apache Curator操作ZK

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Apache ZooKeeper - 使用Apache Curator操作ZK

20201202142259431.png20201129103937906.png

原生ZK API VS Curator


Apache ZooKeeper - 使用原生的API操作ZK

ZooKeeper原生Java API的不足之处:

  • 连接zk超时时,不支持自动重连,需要手动操作
  • Watch注册一次就会失效,需手工反复注册
  • 不支持递归创建节点
  • 异步支持,没有线程池

Apache curator:


  • 解决Watch注册一次就会失效的问题
  • API 更加简单易用、封装了常用的ZooKeeper工具类
  • 使用Curator实现比如分布式锁等需求更简单
  • 异步执行,支持自定义线程池


Curator是netflix公司开源的一套zookeeper客户端,Apache的顶级项目


与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量


Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等


Curator 概述

Apache Curator : https://curator.apache.org/


20201129103948960.png


看看模块


20201129111020307.png

  • curator-framework:对zookeeper的底层api的一些封装
  • curator-client:提供一些客户端的操作,例如重试策略等
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等



Maven依赖

     <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>5.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


会话创建

和客户端/ 服务器交互,第一步就要创建会话

Curator 提供了多种方式创建会话

静态工厂方式创建会话

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);
        CuratorFramework client = CuratorFrameworkFactory.newClient(getConnectStr(), retryPolicy);
        client .start();

使用 fluent 风格创建会话

   RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);
   curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr())
           .retryPolicy(retryPolicy)
           .sessionTimeoutMs(sessionTimeoutMs)
           .connectionTimeoutMs(connectionTimeoutMs)
           .canBeReadOnly(true)
           .build();
 curatorFramework.start();



上述代码采用了流式方式,最核心的类是 CuratorFramework 类,该类的作用是定义一个 ZooKeeper 客户端对象,并在之后的上下文中使用。


在定义 CuratorFramework 对象实例的时候, 使用了 CuratorFrameworkFactory 工厂方法,并指定了 connectionString服务器地址列表、retryPolicy 重试策略 、sessionTimeoutMs 会话超时时间、connectionTimeoutMs 会话创建超时时间。


connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3:port3

retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误


策略名称 描述
ExponentialBackoffRetry 重试一组次数,重试之间的睡眠时间增加
RetryNTimes 重试最大次数
RetryOneTime 只重试一次
RetryUntilElapsed 在给定的时间结束之前重试


sessionTimeoutMs 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。


另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而connectionTimeoutMs 作用在客户端。


创建节点

    /**
     * 递归创建子节点
     */
    @SneakyThrows
    @Test
    public void testCreateWithParent()  {
        CuratorFramework curatorFramework = getCuratorFramework();
        String pathWithParent = "/artisan-node/artisan-node-sub1/artisan-node-sub1-1";
        String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
        log.info("curator create node :{}  successfully.", path);
    }


20201201210528671.png

使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息


protection 模式 ,规避僵尸节点

  /**
     * protection 模式,防止由于异常原因,导致僵尸节点
     * @throws Exception
     */
    @SneakyThrows
    @Test
    public void testCreate()  {
        CuratorFramework curatorFramework = getCuratorFramework();
        String forPath = curatorFramework
                .create()
                .withProtection()  // 防止僵尸节点
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
                        forPath("/curator-node", "data".getBytes());
        log.info("curator create node :{}  successfully.", forPath);
    }


20201202142259431.png

看下zk中的数据


20201202142350957.png


实现原理后面单独开篇解读,总体思想就是 随机生成一个UUID, 再创建之前客户端根据这个缓存的UUID去看ZK Server是否存在,存在则认为是成功的,否则就继续创建。


获取数据


 @Test
    public void testGetData() throws Exception {
        CuratorFramework curatorFramework = getCuratorFramework();
        byte[] bytes = curatorFramework.getData().forPath("/curator-node");
        log.info("get data from  node :{}  successfully.", new String(bytes));
    }

通过客户端实例的getData() 方法更新 ZooKeeper 服务上的数据节点,在getData 方法的后边,通过 forPath 函数来指定查询的节点名称


修改数据


@Test
    public void testSetData() throws Exception {
        CuratorFramework curatorFramework = getCuratorFramework();
        curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());
        byte[] bytes = curatorFramework.getData().forPath("/curator-node");
        log.info("get data from  node /curator-node :{}  successfully.", new String(bytes));
    }

通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。


删除数据 guaranteed()

  @Test
    public void testDelete() throws Exception {
        CuratorFramework curatorFramework = getCuratorFramework();
        String pathWithParent = "/node-parent";
        curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
    }


guaranteed:主要起到一个保障删除成功的作用, 只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在ZooKeeper 服务端被删除。


deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。


获取子节点

     @Test
    public void testListChildren() throws Exception {
        CuratorFramework curatorFramework = getCuratorFramework();
        String pathWithParent = "/artisan-node";
        List<String> list = curatorFramework.getChildren().forPath(pathWithParent);
        list.forEach(System.out::println); 
    }


通过客户端实例的 getChildren() 方法更新 ZooKeeper 服务上的数据节点,在getChildren方法的后边,通过 forPath 函数来指定节点下的一级子节点的名称


异步线程池


Curator 使用BackgroundCallback 接口处理服务器端返回来的信息。

如果在异步线程中调用,默认在 EventThread 线程中调用,支持自定义线程池

 /**
     * 使用默认的 EventThread异步线程处理
     * @throws Exception
     */
    @Test
    public void testThreadPoolByDefaultEventThread() throws Exception {
        CuratorFramework curatorFramework = getCuratorFramework();
        String ZK_NODE="/artisan-node";
        curatorFramework.getData().inBackground((client, event) -> {
            log.info(" background: {}", new String(event.getData()));
        }).forPath(ZK_NODE);;
    }


20201202151456664.png

    /**
     * 使用自定义线程池
     * @throws Exception
     */
    @Test
    public void testThreadPoolByCustomThreadPool() throws Exception {
        CuratorFramework curatorFramework = getCuratorFramework();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        String ZK_NODE="/artisan-node";
        curatorFramework.getData().inBackground((client, event) -> {
            log.info(" background: {}", new String(event.getData()));
        },executorService).forPath(ZK_NODE);
     }

20201202151640585.png


20201202143818411.png


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
3月前
|
Java API Maven
【zookeeper 第五篇章】Curator 库
Curator 是 Netflix 开源的 ZooKeeper 客户端框架,简化了原生 API 的使用并提供了高级功能。可通过 Maven 添加依赖 `curator-framework` 和 `curator-recipes`。示例代码展示了如何创建 Curator 连接、配置重连策略、进行节点的 CRUD 操作以及事务处理等。例如,使用 `ExponentialBackoffRetry` 实现指数退避重试,通过 `create()` 方法创建持久节点,以及利用 `inTransaction()` 启动事务来保证多个操作的原子性。
95 0
|
1月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
62 1
|
1月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
40 1
|
6月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
数据采集 分布式计算 Kubernetes
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
94 4
|
3月前
|
分布式计算 监控 Hadoop
详解 Apache ZooKeeper 和 Apache Oozie
【8月更文挑战第31天】
117 0
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
214 5
|
6月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
74 11
|
6月前
|
存储 Java 网络安全
ZooKeeper【搭建 03】apache-zookeeper-3.6.0 伪集群版(一台服务器实现三个节点的ZooKeeper集群)
【4月更文挑战第10天】ZooKeeper【搭建 03】apache-zookeeper-3.6.0 伪集群版(一台服务器实现三个节点的ZooKeeper集群)
79 1
|
6月前
|
存储 Java 网络安全
ZooKeeper【搭建 02】apache-zookeeper-3.6.0 集群版(准备+安装配置+启动验证)
【4月更文挑战第8天】ZooKeeper【搭建 02】apache-zookeeper-3.6.0 集群版(准备+安装配置+启动验证)
90 1

推荐镜像

更多
下一篇
无影云桌面