Zookeeper客户端Curator使用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Zookeeper客户端Curator使用

 

image.gif编辑

👨🏻‍🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟

🌈擅长领域:Java、大数据、运维、电子

🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏,相应的有空了我也会回访,互助!!!

🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!

 


    • 使用的项目构建工具为Maven,使用坐标如下:
    <dependencies>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.13.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.13.0</version>
            </dependency>
      <!--google的工具类-->
            <dependency>
                <groupId>com.google.collections</groupId>
                <artifactId>google-collections</artifactId>
                <version>1.0</version>
            </dependency>
    <!--使用slf4j日志-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
    <!--junit单元测试-->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    • image.gif
    • Curator包含的几个包:
      1. curator-framework:对zookeeper的底层api的一些封装
      2. curator-client:提供一些客户端的操作,例如重试策略等
      3. urator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

        Curator的基本API

        创建会话

          1. 使用静态方法创建客户端
          RetryPolicy retry = new ExponentialBackoffRetry(1000, 1);
          CuratorFramework client = CuratorFrameworkFactory.newClient(connect, 5000, 2000, retry);
          1. image.gif
            • newClient包含四个主要的参数:
              Untitled
            • 使用功Fluent风格的Api创建会话
            CuratorFramework client = CuratorFrameworkFactory
                            .builder()
                            .connectString(connect)
                            .sessionTimeoutMs(5000)
                            .connectionTimeoutMs(3000)
                            .retryPolicy(retry)
                            .build();
            • image.gif
            • 创建包含隔离命名空间的会话
              为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径,例如(下面的例子)当客户端指定了独立命名空间为“/mybase”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的
            RetryPolicy retry = new ExponentialBackoffRetry(1000, 1);
                    CuratorFramework client = CuratorFrameworkFactory
                            .builder()
                            .connectString(connect)
                            .sessionTimeoutMs(5000)
                            .connectionTimeoutMs(3000)
                            .retryPolicy(retry)
                            .namespace("mybase")
                            .build();
            • image.gif
            • 启动客户端
              • 当创建会话成功,得到client实例然后可以直接调用其的start()方法打开客户端
              client.start();
              • image.gif
                • 数据节点操作创建数据节点Zookeeper的节点创建模式:
                  1. PERSISTENT:持久化
                  2. PERSISTENT_SQUENTIAL:持久化并带有序列号
                  3. EPHEMERAL:临时
                  4. EPHEMERAL_SQUENTIAL:临时并带有序列号
                    • 创建一个节点,初始化内容为空
                    client.create().forPath("path");
                    //如果没有指定节点属性,节点创建模式默认为持久化节点,内容默认为空
                    • image.gif
                    • 创建一个节点,附带初始化内容
                    client.create().forPath("path","init".getBytes());
                    • image.gif
                    • 创建一个节点,指定创建模式(临时节点),内容为空
                    client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
                    • image.gif
                    • 创建一个节点,指定创建模式(临时节点),附带初始化内容
                    client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes);
                    • image.gif
                    • 创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点。
                    client.create()
                      .creatingParentContainersIfNeeded()
                      .withMode(CraeteMOde.EPHEMERAL)
                      .forPath("path","init".getBytes);
                    • image.gif
                      • 这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
                        • 删除数据节点
                          • 删除一个节点
                            client.delete().forPath("path");
                            //注意:此方法只能删除叶子节点,否则会抛出异常
                            • image.gif
                              • 删除一个节点,并且递归删除其他所有的子节点
                              client.delete().deletingChildrenIfNeeded().forPath("path);
                              • image.gif
                              • 删除一个节点,强制指定版本进行删除
                              client.delete().withVersion(10086).forPath("path");
                              • image.gif
                              • 删除一个节点,强制保证删除
                              client.delete().guaranteed().forPath("path");
                              • image.gif
                                • guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
                                  • 读取数据节点数据
                                    • 读取一个数据节点的数据内容
                                    client.getData().forPath("path");
                                    • image.gif
                                      • 注意,此方法返回值是byte[]
                                        • 读取一个节点的数据内容,同时获取到该节点的stat
                                        Stat stat=new Stat();
                                        Client.getData().storingStatIn(stat).forPath("path");
                                        • image.gif
                                        • 更新数据节点
                                          • 更新一个节点的数据内容
                                          client.setData().forPath("path","data".getBytes());
                                          • image.gif
                                            • 注意:该接口会返回一个Stat实例
                                              • 更新一个节点的数据内容,强制指定版本进行更新
                                              client.setData().withVersion(10086).forPath("path","data".getBytes());
                                              • image.gif
                                              • 检查节点时候存在
                                              client.checkExists().forPath("path");
                                              • image.gif
                                                • 注意:该方法返回一个Stat实例,用于检查Znode是否存在的操作,可以协调额外的方法监控或者后台处理)并在最后调用forPath()指定要操作的Znode
                                                  • 获取某个节点的所有子节点路径
                                                  client.getChildren.forPath("path");
                                                  • image.gif
                                                    • 注意:该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
                                                      • 事务
                                                        • CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交
                                                          client.inTransaction().check().forPath("path")
                                                                .and()
                                                                .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
                                                                .and()
                                                                .setData().withVersion(10086).forPath("path","data2".getBytes())
                                                                .and()
                                                                .commit();
                                                          • image.gif
                                                            • 异步接口
                                                              • 创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息image.gif编辑
                                                                • 响应吗(#getResultCode())image.gif编辑
                                                                  • 异步节点创建法:
                                                                  Executor executor = Executors.newFixedThreadPool(2);
                                                                  client.create()
                                                                        .creatingParentsIfNeeded()
                                                                        .withMode(CreateMode.EPHEMERAL)
                                                                        .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
                                                                        },executor)
                                                                        .forPath("path");
                                                                  • image.gif
                                                                    • 注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
                                                                      • 实验demo
                                                                      private final String connect="192.168.123.72:2180,192.168.123.73:2180,192.168.123.74:2180";
                                                                          /**
                                                                           * 在zookeeper的/目录下创建节点
                                                                           * @throws Exception
                                                                           */
                                                                          @Test
                                                                          public void writeZookeeperAPITest() throws Exception {
                                                                            RetryPolicy retry = new ExponentialBackoffRetry(1000, 1);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectString(connect)
                                                                                      .sessionTimeoutMs(5000)
                                                                                      .connectionTimeoutMs(3000)
                                                                                      .retryPolicy(retry)
                                                                                      .namespace("mybase")
                                                                                      .build();
                                                                              client.start();
                                                                              client.create()
                                                                                      .creatingParentContainersIfNeeded()
                                                                                      .withMode(CreateMode.PERSISTENT)
                                                                                      .forPath("/zoomdem"+new SimpleDateFormat("hh:mm:ss").format(new Date()),new SimpleDateFormat("yyyy-MM-dd").format(new Date())
                                                                                              .getBytes());
                                                                              client.close();
                                                                          }
                                                                          /**
                                                                           * 读取zookeeper节点下的携带参数
                                                                           * @throws Exception
                                                                           */
                                                                          @Test
                                                                          public void readZookeeperAPITest() throws Exception {
                                                                              RetryPolicy Retry = new ExponentialBackoffRetry(2000,3);
                                                                              CuratorFramework client
                                                                                      = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectString(connect)
                                                                                      .sessionTimeoutMs(3000)
                                                                                      .connectionTimeoutMs(4000)
                                                                                      .namespace("thisday15")
                                                                                      .retryPolicy(Retry)
                                                                                      .build();
                                                                              client.start();
                                                                              byte[] thisday15s = client.getData().forPath("/hellozoom");
                                                                              System.out.println(new String(thisday15s));
                                                                              client.close();
                                                                          }
                                                                          /**
                                                                           * 删除Zookeeper集群中的一个节点,每一个znode都有一个与之对应stat结构
                                                                           *  @thows Exception
                                                                           */
                                                                          @Test
                                                                          public void deleteZookeeperAPITest() throws Exception{
                                                                              RetryPolicy Retry = new ExponentialBackoffRetry(5000, 2200);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .namespace("thisday15")
                                                                                      .retryPolicy(Retry)
                                                                                      .connectString(connect)
                                                                                      .sessionTimeoutMs(5000)
                                                                                      .connectionTimeoutMs(5000)
                                                                                      .build();
                                                                              client.start();
                                                                              System.out.println(client.checkExists().forPath("/hellozoom").toString());
                                                                              client.delete().forPath("/hellozoom");
                                                                              System.out.println(client.checkExists().forPath("/hellozoom").toString());
                                                                              client.close();
                                                                          }
                                                                          /**
                                                                           * 创建Znode节点
                                                                           */
                                                                          @Test
                                                                          public void createZookeeperAPITest(){
                                                                              RetryPolicy Retry = new ExponentialBackoffRetry(5000, 5000);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectionTimeoutMs(5000)
                                                                                      .sessionTimeoutMs(3000)
                                                                                      .connectString(connect)
                                                                                      .retryPolicy(Retry)
                                                                                      .build();
                                                                              client.start();
                                                                              try {
                                                                                  client
                                                                                          .create()
                                                                                          .creatingParentContainersIfNeeded()
                                                                                          .withMode(CreateMode.PERSISTENT)
                                                                                          .forPath("/code3","init0".getBytes());
                                                                              } catch (Exception e) {
                                                                                  e.printStackTrace();
                                                                              }finally {
                                                                                  client.close();
                                                                              }
                                                                          }
                                                                          /**
                                                                           * 使用异步接口创建节点
                                                                           */
                                                                          @Test
                                                                          public void backgroundZookeeperAPITest(){
                                                                              RetryPolicy retry = new ExponentialBackoffRetry(5000, 5000);
                                                                      //        创建异步线程池
                                                                              Executor executor = Executors.newFixedThreadPool(2);
                                                                              CuratorFramework client = CuratorFrameworkFactory
                                                                                      .builder()
                                                                                      .connectString(connect)
                                                                                      .retryPolicy(retry)
                                                                                      .sessionTimeoutMs(5000)
                                                                                      .connectionTimeoutMs(5000)
                                                                                      .build();
                                                                              client.start();
                                                                              try {
                                                                                  client
                                                                                          .create()
                                                                                          .creatingParentsIfNeeded()
                                                                                          .withMode(CreateMode.PERSISTENT)
                                                                                          .inBackground(((curatorFramework, curatorEvent) -> {
                                                                                              System.out.println(String.format("eventType:%s;\\n eventResult:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
                                                                                          }),executor)
                                                                                          .forPath("/code888","init889".getBytes());
                                                                              } catch (Exception e) {
                                                                                  e.printStackTrace();
                                                                              }finally {
                                                                                  client.close();
                                                                              }
                                                                          }
                                                                      • image.gif
                                                                          相关实践学习
                                                                          基于MSE实现微服务的全链路灰度
                                                                          通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
                                                                          相关文章
                                                                          |
                                                                          1月前
                                                                          |
                                                                          存储 缓存 Java
                                                                          【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
                                                                          【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
                                                                          50 0
                                                                          |
                                                                          1月前
                                                                          |
                                                                          Java API Apache
                                                                          ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
                                                                          【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
                                                                          40 11
                                                                          |
                                                                          1月前
                                                                          |
                                                                          存储
                                                                          ZooKeeper客户端常用命令
                                                                          ZooKeeper客户端常用命令
                                                                          32 1
                                                                          |
                                                                          1月前
                                                                          |
                                                                          安全 Java API
                                                                          Zookeeper(持续更新) VIP-02 Zookeeper客户端使用与集群特性
                                                                          2,/usr/local/data/zookeeper-3,/usr/local/data/zookeeper-4,在每个目录中创建文件。创建四个文件夹/usr/local/data/zookeeper-1,/usr/local/data/zookeeper-Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点。己对外提供服务的起始状态。E: 角色, 默认是 participant,即参与过半机制的角色,选举,事务请求过半提交,还有一个是。
                                                                          |
                                                                          1月前
                                                                          Zookeeper的客户端的命令
                                                                          Zookeeper的客户端的命令
                                                                          26 0
                                                                          |
                                                                          1月前
                                                                          |
                                                                          缓存 Java API
                                                                          Zookeeper(持续更新) VIP-02 Zookeeper客户端使用与集群特性
                                                                          Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。
                                                                          |
                                                                          1月前
                                                                          |
                                                                          Apache
                                                                          Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
                                                                          Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
                                                                          50 2
                                                                          |
                                                                          1月前
                                                                          |
                                                                          存储 设计模式 算法
                                                                          深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
                                                                          当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。
                                                                          143 1
                                                                          深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
                                                                          |
                                                                          1月前
                                                                          |
                                                                          存储 设计模式 算法
                                                                          深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
                                                                          当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。
                                                                          118 0
                                                                          |
                                                                          1月前
                                                                          |
                                                                          Java API Apache
                                                                          ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
                                                                          ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
                                                                          35 0