Zookeeper客户端Curator使用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: Zookeeper客户端Curator使用

 

image.gif编辑

👨🏻‍🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟

🌈擅长领域:Java、大数据、运维、电子

🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏,相应的有空了我也会回访,互助!!!

🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!

 


    • 使用的项目构建工具为Maven,使用坐标如下:
    <dependencies>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.13.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.13.0</version>
            </dependency>
      <!--google的工具类-->
            <dependency>
                <groupId>com.google.collections</groupId>
                <artifactId>google-collections</artifactId>
                <version>1.0</version>
            </dependency>
    <!--使用slf4j日志-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
    <!--junit单元测试-->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    • image.gif
    • Curator包含的几个包:
      1. curator-framework:对zookeeper的底层api的一些封装
      2. curator-client:提供一些客户端的操作,例如重试策略等
      3. urator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

        Curator的基本API

        创建会话

          1. 使用静态方法创建客户端
          RetryPolicy retry = new ExponentialBackoffRetry(1000, 1);
          CuratorFramework client = CuratorFrameworkFactory.newClient(connect, 5000, 2000, retry);
          1. image.gif
            • newClient包含四个主要的参数:
              Untitled
            • 使用功Fluent风格的Api创建会话
            CuratorFramework client = CuratorFrameworkFactory
                            .builder()
                            .connectString(connect)
                            .sessionTimeoutMs(5000)
                            .connectionTimeoutMs(3000)
                            .retryPolicy(retry)
                            .build();
            • image.gif
            • 创建包含隔离命名空间的会话
              为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径,例如(下面的例子)当客户端指定了独立命名空间为“/mybase”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的
            RetryPolicy retry = new ExponentialBackoffRetry(1000, 1);
                    CuratorFramework client = CuratorFrameworkFactory
                            .builder()
                            .connectString(connect)
                            .sessionTimeoutMs(5000)
                            .connectionTimeoutMs(3000)
                            .retryPolicy(retry)
                            .namespace("mybase")
                            .build();
            • image.gif
            • 启动客户端
              • 当创建会话成功,得到client实例然后可以直接调用其的start()方法打开客户端
              client.start();
              • image.gif
                • 数据节点操作创建数据节点Zookeeper的节点创建模式:
                  1. PERSISTENT:持久化
                  2. PERSISTENT_SQUENTIAL:持久化并带有序列号
                  3. EPHEMERAL:临时
                  4. EPHEMERAL_SQUENTIAL:临时并带有序列号
                    • 创建一个节点,初始化内容为空
                    client.create().forPath("path");
                    //如果没有指定节点属性,节点创建模式默认为持久化节点,内容默认为空
                    • image.gif
                    • 创建一个节点,附带初始化内容
                    client.create().forPath("path","init".getBytes());
                    • image.gif
                    • 创建一个节点,指定创建模式(临时节点),内容为空
                    client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
                    • image.gif
                    • 创建一个节点,指定创建模式(临时节点),附带初始化内容
                    client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes);
                    • image.gif
                    • 创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点。
                    client.create()
                      .creatingParentContainersIfNeeded()
                      .withMode(CraeteMOde.EPHEMERAL)
                      .forPath("path","init".getBytes);
                    • image.gif
                      • 这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
                        • 删除数据节点
                          • 删除一个节点
                            client.delete().forPath("path");
                            //注意:此方法只能删除叶子节点,否则会抛出异常
                            • image.gif
                              • 删除一个节点,并且递归删除其他所有的子节点
                              client.delete().deletingChildrenIfNeeded().forPath("path);
                              • image.gif
                              • 删除一个节点,强制指定版本进行删除
                              client.delete().withVersion(10086).forPath("path");
                              • image.gif
                              • 删除一个节点,强制保证删除
                              client.delete().guaranteed().forPath("path");
                              • image.gif
                                • guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
                                  • 读取数据节点数据
                                    • 读取一个数据节点的数据内容
                                    client.getData().forPath("path");
                                    • image.gif
                                      • 注意,此方法返回值是byte[]
                                        • 读取一个节点的数据内容,同时获取到该节点的stat
                                        Stat stat=new Stat();
                                        Client.getData().storingStatIn(stat).forPath("path");
                                        • image.gif
                                        • 更新数据节点
                                          • 更新一个节点的数据内容
                                          client.setData().forPath("path","data".getBytes());
                                          • image.gif
                                            • 注意:该接口会返回一个Stat实例
                                              • 更新一个节点的数据内容,强制指定版本进行更新
                                              client.setData().withVersion(10086).forPath("path","data".getBytes());
                                              • image.gif
                                              • 检查节点时候存在
                                              client.checkExists().forPath("path");
                                              • image.gif
                                                • 注意:该方法返回一个Stat实例,用于检查Znode是否存在的操作,可以协调额外的方法监控或者后台处理)并在最后调用forPath()指定要操作的Znode
                                                  • 获取某个节点的所有子节点路径
                                                  client.getChildren.forPath("path");
                                                  • image.gif
                                                    • 注意:该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
                                                      • 事务
                                                        • CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交
                                                          client.inTransaction().check().forPath("path")
                                                                .and()
                                                                .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
                                                                .and()
                                                                .setData().withVersion(10086).forPath("path","data2".getBytes())
                                                                .and()
                                                                .commit();
                                                          • image.gif
                                                            • 异步接口
                                                              • 创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息image.gif编辑
                                                                • 响应吗(#getResultCode())image.gif编辑
                                                                  • 异步节点创建法:
                                                                  Executor executor = Executors.newFixedThreadPool(2);
                                                                  client.create()
                                                                        .creatingParentsIfNeeded()
                                                                        .withMode(CreateMode.EPHEMERAL)
                                                                        .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
                                                                        },executor)
                                                                        .forPath("path");
                                                                  • image.gif
                                                                    • 注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
                                                                      • 实验demo
                                                                      private final String connect="192.168.123.72:2180,192.168.123.73:2180,192.168.123.74:2180";
                                                                          /**
                                                                           * 在zookeeper的/目录下创建节点
                                                                           * @throws Exception
                                                                           */
                                                                          @Test
                                                                          public void writeZookeeperAPITest() throws Exception {
                                                                            RetryPolicy retry = new ExponentialBackoffRetry(1000, 1);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectString(connect)
                                                                                      .sessionTimeoutMs(5000)
                                                                                      .connectionTimeoutMs(3000)
                                                                                      .retryPolicy(retry)
                                                                                      .namespace("mybase")
                                                                                      .build();
                                                                              client.start();
                                                                              client.create()
                                                                                      .creatingParentContainersIfNeeded()
                                                                                      .withMode(CreateMode.PERSISTENT)
                                                                                      .forPath("/zoomdem"+new SimpleDateFormat("hh:mm:ss").format(new Date()),new SimpleDateFormat("yyyy-MM-dd").format(new Date())
                                                                                              .getBytes());
                                                                              client.close();
                                                                          }
                                                                          /**
                                                                           * 读取zookeeper节点下的携带参数
                                                                           * @throws Exception
                                                                           */
                                                                          @Test
                                                                          public void readZookeeperAPITest() throws Exception {
                                                                              RetryPolicy Retry = new ExponentialBackoffRetry(2000,3);
                                                                              CuratorFramework client
                                                                                      = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectString(connect)
                                                                                      .sessionTimeoutMs(3000)
                                                                                      .connectionTimeoutMs(4000)
                                                                                      .namespace("thisday15")
                                                                                      .retryPolicy(Retry)
                                                                                      .build();
                                                                              client.start();
                                                                              byte[] thisday15s = client.getData().forPath("/hellozoom");
                                                                              System.out.println(new String(thisday15s));
                                                                              client.close();
                                                                          }
                                                                          /**
                                                                           * 删除Zookeeper集群中的一个节点,每一个znode都有一个与之对应stat结构
                                                                           *  @thows Exception
                                                                           */
                                                                          @Test
                                                                          public void deleteZookeeperAPITest() throws Exception{
                                                                              RetryPolicy Retry = new ExponentialBackoffRetry(5000, 2200);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .namespace("thisday15")
                                                                                      .retryPolicy(Retry)
                                                                                      .connectString(connect)
                                                                                      .sessionTimeoutMs(5000)
                                                                                      .connectionTimeoutMs(5000)
                                                                                      .build();
                                                                              client.start();
                                                                              System.out.println(client.checkExists().forPath("/hellozoom").toString());
                                                                              client.delete().forPath("/hellozoom");
                                                                              System.out.println(client.checkExists().forPath("/hellozoom").toString());
                                                                              client.close();
                                                                          }
                                                                          /**
                                                                           * 创建Znode节点
                                                                           */
                                                                          @Test
                                                                          public void createZookeeperAPITest(){
                                                                              RetryPolicy Retry = new ExponentialBackoffRetry(5000, 5000);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectionTimeoutMs(5000)
                                                                                      .sessionTimeoutMs(3000)
                                                                                      .connectString(connect)
                                                                                      .retryPolicy(Retry)
                                                                                      .build();
                                                                              client.start();
                                                                              try {
                                                                                  client
                                                                                          .create()
                                                                                          .creatingParentContainersIfNeeded()
                                                                                          .withMode(CreateMode.PERSISTENT)
                                                                                          .forPath("/code3","init0".getBytes());
                                                                              } catch (Exception e) {
                                                                                  e.printStackTrace();
                                                                              }finally {
                                                                                  client.close();
                                                                              }
                                                                          }
                                                                          /**
                                                                           * 使用异步接口创建节点
                                                                           */
                                                                          @Test
                                                                          public void backgroundZookeeperAPITest(){
                                                                              RetryPolicy retry = new ExponentialBackoffRetry(5000, 5000);
                                                                      //        创建异步线程池
                                                                              Executor executor = Executors.newFixedThreadPool(2);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectString(connect)
                                                                                      .retryPolicy(retry)
                                                                                      .sessionTimeoutMs(5000)
                                                                                      .connectionTimeoutMs(5000)
                                                                                      .build();
                                                                              client.start();
                                                                              try {
                                                                                  client
                                                                                          .create()
                                                                                          .creatingParentsIfNeeded()
                                                                                          .withMode(CreateMode.PERSISTENT)
                                                                                          .inBackground(((curatorFramework, curatorEvent) -> {
                                                                                              System.out.println(String.format("eventType:%s;\\n eventResult:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
                                                                                          }),executor)
                                                                                          .forPath("/code888","init889".getBytes());
                                                                              } catch (Exception e) {
                                                                                  e.printStackTrace();
                                                                              }finally {
                                                                                  client.close();
                                                                              }
                                                                          }
                                                                      • image.gif
                                                                          相关实践学习
                                                                          基于MSE实现微服务的全链路灰度
                                                                          通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
                                                                          相关文章
                                                                          |
                                                                          6月前
                                                                          |
                                                                          存储 缓存 Java
                                                                          【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
                                                                          【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
                                                                          105 0
                                                                          |
                                                                          3月前
                                                                          |
                                                                          Java API Maven
                                                                          【zookeeper 第五篇章】Curator 库
                                                                          Curator 是 Netflix 开源的 ZooKeeper 客户端框架,简化了原生 API 的使用并提供了高级功能。可通过 Maven 添加依赖 `curator-framework` 和 `curator-recipes`。示例代码展示了如何创建 Curator 连接、配置重连策略、进行节点的 CRUD 操作以及事务处理等。例如,使用 `ExponentialBackoffRetry` 实现指数退避重试,通过 `create()` 方法创建持久节点,以及利用 `inTransaction()` 启动事务来保证多个操作的原子性。
                                                                          95 0
                                                                          |
                                                                          3月前
                                                                          |
                                                                          存储 API Apache
                                                                          【zookeeper 第三篇章】客户端 API
                                                                          本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
                                                                          33 0
                                                                          |
                                                                          1月前
                                                                          |
                                                                          分布式计算 Java Hadoop
                                                                          Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
                                                                          Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
                                                                          62 1
                                                                          |
                                                                          2月前
                                                                          |
                                                                          负载均衡 API 数据安全/隐私保护
                                                                          Zookeeper的客户端-原生的API
                                                                          Zookeeper的客户端-原生的API
                                                                          |
                                                                          4月前
                                                                          |
                                                                          API
                                                                          【想进大厂还不会阅读源码】ShenYu源码-替换ZooKeeper客户端
                                                                          ShenYu源码阅读。相信大家碰到源码时经常无从下手,不知道从哪开始阅读😭。我认为有一种办法可以解决大家的困扰!至此,我们发现自己开始从大量堆砌的源码中脱离开来😀。ShenYu是一个异步的,高性能的,跨语言的,响应式的 API 网关。
                                                                          |
                                                                          6月前
                                                                          |
                                                                          Java API Apache
                                                                          ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
                                                                          【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
                                                                          74 11
                                                                          |
                                                                          6月前
                                                                          |
                                                                          存储
                                                                          ZooKeeper客户端常用命令
                                                                          ZooKeeper客户端常用命令
                                                                          66 1
                                                                          |
                                                                          6月前
                                                                          |
                                                                          安全 Java API
                                                                          Zookeeper(持续更新) VIP-02 Zookeeper客户端使用与集群特性
                                                                          2,/usr/local/data/zookeeper-3,/usr/local/data/zookeeper-4,在每个目录中创建文件。创建四个文件夹/usr/local/data/zookeeper-1,/usr/local/data/zookeeper-Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点。己对外提供服务的起始状态。E: 角色, 默认是 participant,即参与过半机制的角色,选举,事务请求过半提交,还有一个是。
                                                                          |
                                                                          6月前
                                                                          Zookeeper的客户端的命令
                                                                          Zookeeper的客户端的命令
                                                                          45 0