Zookeeper系列(五)——使用Curator访问Zookeeper详解

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: Zookeeper系列(五)——使用Curator访问Zookeeper详解

概述


Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。

除此之外,Curator还提供了Zookeeper的各种应用场景:分布式栅栏、共享锁服务、Master选举机制和分布式计数器等。

官网地址:curator.apache.org/index.html


项目组件


名称 描述
Recipes Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。
Framework Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
Utilities 为Zookeeper提供的各种实用程序。
Client Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
Errors Curator如何处理错误,连接问题,可恢复的例外等。


Maven依赖


<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.1</version>
</dependency>
  • 我们只要引入curator-recipes,就会自动将其他的依赖引入。
  • 我们文章使用的是5.2.1的版本,它对应的zookeeper api版本是3.6.3。

image.png


使用案例


创建会话


Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory工厂类的newClient方法返回CuratorFramework,它有多个重载方法, 最后调用start方法创建连接。

  • CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
  • CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
  • CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)

查看3个方法的源码,发现最终他们都是使用建造者模式,统一调用了builder的.build方法。

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
    {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
            connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            zkClientConfig(zkClientConfig).
            build();
    }

参数说明:

参数 说明
connectString zk server的地址,多个server用逗号分隔
sessionTimeoutMs 会话超时时间,默认是60s
connectionTimeoutMs 连接超时时间,默认是15s
retryPolicy 失败重试策略
zkClientConfig zk客户端的配置,比如认证相关的配置

代码例子:

@Before
    public void init() {
        // 重试机制, 每个1秒钟重试1次,最多重试5次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5, 5000);
        // 定义客户端
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 30000, 60000, retryPolicy);
        // 建立连接
        curatorFramework.start();
    }
  • 重试机制RetryPolicy有很多内置的实现,可以看他的javadoc一目了然。
  • 最后需要调用start方法建立连接。


创建节点


Curator创建节点的方法也是基于Fluent风格编码,原生API中的参数很多都转化为一层层的方法调用来进行设置,直接上例子。

@Test
    public void testCreateNode() throws Exception {
        // 创建临时的,任何人都有权限的节点
        String result = curatorFramework.create().withMode(CreateMode.EPHEMERAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
        log.warn("创建节点: [{}]", result);
    }
  • creatingParentContainersIfNeeded方法用来递归创建父节点。
  • withMode方法可以设置节点的属性,是持久节点,临时节点,持久有序节点、临时有序节点等。
  • withACL方法可以设置节点的访问权限。
  • forPath方法设置节点的路径和对应值。
  • 更多方法使用可以自行参考java doc文档。


更新节点


@Test
    public void testUpdateNode() throws Exception {
        // 创建节点
        curatorFramework.create().creatingParentContainersIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
        // 不带版本修改节点
        Stat stat = curatorFramework.setData().forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));
        log.info("修改后节点版本: [{}]", stat.getVersion());
        // 带版本修改节点,版本不正确,报错
        stat = curatorFramework.setData().withVersion(10).forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));
        log.info("修改后节点版本: [{}]", stat.getVersion());
    }

image.png

  • withVersion方法用来指定版本,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常,可以用来实现乐观锁。


查看节点


@Test
    public void testGetNode() throws Exception {
        // 创建节点
        curatorFramework.create().creatingParentContainersIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
        // 直接查询
        byte[] data = curatorFramework.getData().forPath("/emNode");
        log.info("节点数据: [{}]", new String(data));
        // 同时获取节点属性,放到stat中
        Stat stat = new Stat();
        data = curatorFramework.getData().storingStatIn(stat).forPath("/emNode");
        log.info("节点数据: [{}]", new String(data));
        log.info("节点版本: [{}]", stat.getVersion());
        // 查询的时候注册监听
        data = curatorFramework.getData().usingWatcher(new CuratorWatcher() {
            @Override
            public void process(WatchedEvent event) throws Exception {
                log.info("事件监听: [{}]", event);
            }
        }).forPath("/emNode");
        log.info("节点数据: [{}]", new String(data));
        Thread.sleep(100000);
    }
  • getData()方法用来获取GetDataBuilder查询器。
  • storingStatIn方法用来存放查询到的节点属性信息。
  • usingWatcher方法用来注册监听器,这个方法只会一次监听。


删除节点


@Test
    public void testDeleteNode() throws Exception {
        // 创建节点
        curatorFramework.create().creatingParentContainersIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
        // 方法
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/emNode");
    }
  • delete()用来创建删除构造器DeleteBuilder
  • deletingChildrenIfNeeded()删除节点并递归删除其子节点。
  • withVersion()指定版本删除,如果版本不对抛出异常
  • guaranteed()方法强制保证删除一个节点,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。


监听器使用


Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器,上面已经做了演示。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。

Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。以前的Cache事件监听的种类有3种Path Cache, Cache,Tree Cache, 在新版本中合并为一种CuratorCache, 我们以CuratorCache新版演示为例子。

CuratorCache试图将来自节点的数据保存在本地缓存中。还可以选择缓存该节点下的整个子树。将响应更新/创建/删除事件,下拉数据等。您可以注册侦听器,以便在发生更改时得到通知。

@Test
    public void testWatchCache() throws Exception {
        try {
            String workPath = "/aa/bb/cc";
            // 创建节点
            curatorFramework.create().creatingParentContainersIfNeeded()
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(workPath, "hello".getBytes(StandardCharsets.UTF_8));
            // 创建缓存
            CuratorCache curatorCache = CuratorCache.builder(curatorFramework, workPath).build();
            CuratorCacheListener curatorCacheListener = new CuratorCacheListener() {
                @Override
                public void event(Type type, ChildData oldData, ChildData data) {
                    log.info("%%%%%%%%%%%%%%%watch start……………………………………………………");
                    log.info("change type: [{}]", type);
                    log.info("ZNode节点状态改变, path={}", data.getPath());
                    log.info("ZNode节点状态改变,before: [{}], after: [{}]", oldData != null ? new String(oldData.getData()): null, new String(data.getData()));
                }
            };
            curatorCache.listenable().addListener(curatorCacheListener);
            curatorCache.start();
            // 第1次变更节点数据
            curatorFramework.setData().forPath("/aa/bb", "第1次更改内容".getBytes());
            Thread.sleep(1000);
            // 第2次变更节点数据
            curatorFramework.setData().forPath("/aa/bb/cc", "第2次更改内容".getBytes());
            Thread.sleep(1000);
            // 第3次创建新节点
            curatorFramework.create().creatingParentContainersIfNeeded()
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath("/aa/bb/dd", "hello".getBytes(StandardCharsets.UTF_8));
            Thread.sleep(1000);
            Thread.sleep(5000);
        } catch (Exception e) {
            log.error("创建Cache监听失败", e);
        } finally {
            // 删除节点
            curatorFramework.delete().deletingChildrenIfNeeded().forPath("/aa");
        }
    }
  • 通过CuratorCache.builder方法创建CuratorCache,默认监听当前节点以及子节点。


小结


上面简单演示了下curator框架操作zookeeper,相对于原生的api,简洁方便很多,更多的使用可以自己参考 doc。

代码地址:github.com/alvinlkk/aw…

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
5月前
|
存储 Java Linux
【Zookeeper】Introduction to Apache Curator
【Zookeeper】Introduction to Apache Curator
127 0
|
5月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
96 0
|
5月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
68 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
|
2月前
|
Java API Maven
【zookeeper 第五篇章】Curator 库
Curator 是 Netflix 开源的 ZooKeeper 客户端框架,简化了原生 API 的使用并提供了高级功能。可通过 Maven 添加依赖 `curator-framework` 和 `curator-recipes`。示例代码展示了如何创建 Curator 连接、配置重连策略、进行节点的 CRUD 操作以及事务处理等。例如,使用 `ExponentialBackoffRetry` 实现指数退避重试,通过 `create()` 方法创建持久节点,以及利用 `inTransaction()` 启动事务来保证多个操作的原子性。
66 0
|
5月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
66 11
|
5月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
49 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
|
5月前
|
缓存 Java 容器
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(2)
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】
134 0
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(2)
|
5月前
|
安全 Java API
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(1)
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】
157 0
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(1)
|
5月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
59 0
|
安全 Java 数据安全/隐私保护
ZooKeeper 未授权访问漏洞利用
ZooKeeper 未授权访问漏洞利用
1729 1