Apache Curator客户端的使用(五)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Apache Curator客户端的使用(五)

zk原生api的不足之处

  • 超时重连,不支持自动,需要手动操作
  • Watch注册一次后会失效
  • 不支持递归创建节点

Apache curator

是apache的开源项目,解决watcher注册一次就失效的问题,api更加简单易用,提供更多解决方案并且实现简单:如分布式锁

创建一个maven工程

引入相关依赖

        <!--zookeeper相关-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.11</version>
        </dependency>

zk基本操作

zk命名空间以及创建节点,节点的增删改查

import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
public class CuratorOperator {
  public CuratorFramework client = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  /**
   * 实例化zk客户端
   */
  public CuratorOperator() {
    /**
     * curator链接zookeeper的策略:RetryNTimes
     * n:重试的次数
     * sleepMsBetweenRetries:每次重试间隔的时间
     */
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
    client = CuratorFrameworkFactory.builder()
        .connectString(zkServerPath)
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
        //命名空间之后创建的节点都会在workspace工作空间里面
        .namespace("workspace").build();
    client.start();
  }
  /**
   *
   * @Description: 关闭zk客户端连接
   */
  public void closeZKClient() {
    if (client != null) {
      this.client.close();
    }
  }
  public static void main(String[] args) throws Exception {
    // 实例化
    CuratorOperator cto = new CuratorOperator();
    boolean isZkCuratorStarted = cto.client.isStarted();
    System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
    // 创建节点
    String nodePath = "/bushro/demo";
    byte[] data = "data".getBytes();
    cto.client.create().creatingParentsIfNeeded()
      .withMode(CreateMode.PERSISTENT)
      .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//默认的权限"world", "anyone"
      .forPath(nodePath, data);
     更新节点数据
    byte[] newData = "newdata".getBytes();
    cto.client.setData().withVersion(0).forPath(nodePath, newData);
    // 删除节点
    cto.client.delete()
          .guaranteed()         // 如果删除失败,那么在后端还是继续会删除,直到成功
          .deletingChildrenIfNeeded() // 如果有子节点,就删除
          .withVersion(0)
          .forPath(nodePath);
    // 读取节点数据
    Stat stat = new Stat();
    byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
    System.out.println("节点" + nodePath + "的数据为: " + new String(data));
    System.out.println("该节点的版本号为: " + stat.getVersion());
    // 查询子节点
    List<String> childNodes = cto.client.getChildren()
                      .forPath(nodePath);
    System.out.println("开始打印子节点:");
    for (String s : childNodes) {
      System.out.println(s);
    }
    // 判断节点是否存在,如果不存在则为空
    Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
    System.out.println(statExist);
    Thread.sleep(100000);
    cto.closeZKClient();
    boolean isZkCuratorStarted2 = cto.client.isStarted();
    System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
  }
  public final static String ADD_PATH = "/super/imooc/d";
}

watch与acl的操作

一次注册多次监听

为节点添加watch事件

    // 为节点添加watcher
    //NodeCache: 监听数据节点的变更,会触发事件
    final NodeCache nodeCache = new NodeCache(cto.client, nodePath);
    // buildInitial : 初始化的时候获取node的值并且缓存
    nodeCache.start(true);
    if (nodeCache.getCurrentData() != null) {
      System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
    } else {
      System.out.println("节点初始化数据为空...");
    }
    nodeCache.getListenable().addListener(new NodeCacheListener() {
      public void nodeChanged() throws Exception {
        if (nodeCache.getCurrentData() == null) {
          System.out.println("空");
          return;
        }
        String data = new String(nodeCache.getCurrentData().getData());
        System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
      }
    });

为子节点添加watch事件

    // 为子节点添加watcher
    // PathChildrenCache: 监听数据节点的增删改,会触发事件
    String childNodePathCache =  nodePath;
    // cacheData: 设置缓存节点的数据状态
    final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
    /**
     * StartMode: 初始化方式
     * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件(比较好)
     * NORMAL:异步初始化
     * BUILD_INITIAL_CACHE:同步初始化
     */
    childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
    List<ChildData> childDataList = childrenCache.getCurrentData();
    System.out.println("当前数据节点的子节点数据列表:");
    for (ChildData cd : childDataList) {
      String childData = new String(cd.getData());
      System.out.println(childData);
    }
    childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
      public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
          System.out.println("子节点初始化ok...");
        }
        else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
          String path = event.getData().getPath();
          System.out.println("添加子节点:" + event.getData().getPath());
          System.out.println("子节点数据:" + new String(event.getData().getData()));
        }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
          System.out.println("删除子节点:" + event.getData().getPath());
        }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
          System.out.println("修改子节点路径:" + event.getData().getPath());
          System.out.println("修改子节点数据:" + new String(event.getData().getData()));
        }
      }
    });

watcher统一配置

例如创建一个存放redis的配置文件节点,里面放入相关操作的json数据如

{“type”:“add”,“url”:“ftp://192.168.254.130/config/redis.xml”,“remark”:“add”}

{“type”:“update”,“url”:“ftp://192.168.254.130/config/redis.xml”,“remark”:“update”}

{“type”:“delete”,“url”:"",“remark”:“delete”}

在程序中我们可以把节点数据转成实体类,根据type的类型来判断进行相应的操作,当zookeeper集群中监听到变化,每台机子就去下载最新的配置文件这样就不用一台一台机子的修改过去,节省时间。

public class Client1 {
  public CuratorFramework client = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  public Client1() {
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
    client = CuratorFrameworkFactory.builder()
        .connectString(zkServerPath)
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
        .namespace("workspace").build();
    client.start();
  }
  public void closeZKClient() {
    if (client != null) {
      this.client.close();
    }
  }
//  public final static String CONFIG_NODE = "/bushro/demo/redis-config";
  public final static String CONFIG_NODE_PATH = "/bushro/demo";
  public final static String SUB_PATH = "/redis-config";
  public static CountDownLatch countDown = new CountDownLatch(1);
  public static void main(String[] args) throws Exception {
    Client1 cto = new Client1();
    System.out.println("client1 启动成功...");
    final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
    childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
    // 添加监听事件
    childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
      public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        // 监听节点变化
        if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
          String configNodePath = event.getData().getPath();
          if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
            System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
            // 读取节点数据
            String jsonConfig = new String(event.getData().getData());
            System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);
            // 从json转换配置(转实体类)
            RedisConfig redisConfig = null;
            if (StringUtils.isNotBlank(jsonConfig)) {
              redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
            }
            // 配置不为空则进行相应操作
            if (redisConfig != null) {
              String type = redisConfig.getType();
              String url = redisConfig.getUrl();
              String remark = redisConfig.getRemark();
              // 判断事件
              if (type.equals("add")) {
                System.out.println("监听到新增的配置,准备下载...");
                // ... 连接ftp服务器,根据url找到相应的配置
                Thread.sleep(500);
                System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                // ... 下载配置到你指定的目录
                Thread.sleep(1000);
                System.out.println("下载成功,已经添加到项目中");
                // ... 拷贝文件到项目目录
              } else if (type.equals("update")) {
                System.out.println("监听到更新的配置,准备下载...");
                // ... 连接ftp服务器,根据url找到相应的配置
                Thread.sleep(500);
                System.out.println("开始下载配置文件,下载路径为<" + url + ">");
                // ... 下载配置到你指定的目录
                Thread.sleep(1000);
                System.out.println("下载成功...");
                System.out.println("删除项目中原配置文件...");
                Thread.sleep(100);
                // ... 删除原文件
                System.out.println("拷贝配置文件到项目目录...");
                // ... 拷贝文件到项目目录
              } else if (type.equals("delete")) {
                System.out.println("监听到需要删除配置");
                System.out.println("删除项目中原配置文件...");
              }
            }
          }
        }
      }
    });
    countDown.await();
    cto.closeZKClient();
  }
}

acl权限操作与认证授权

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import com.imooc.utils.AclUtils;
public class CuratorAcl {
  public CuratorFramework client = null;
  public static final String zkServerPath = "192.168.254.130:2181";
  public CuratorAcl() {
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
    client = CuratorFrameworkFactory.builder().authorization("digest", "bushro1:123456".getBytes())//使用账号密码认证,可以认证多个用户
        .connectString(zkServerPath)
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
        .namespace("workspace").build();
    client.start();
  }
  public void closeZKClient() {
    if (client != null) {
      this.client.close();
    }
  }
  public static void main(String[] args) throws Exception {
    // 实例化
    CuratorAcl cto = new CuratorAcl();
    boolean isZkCuratorStarted = cto.client.isStarted();
    System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
    String nodePath = "/acl/father/child/sub";
    List<ACL> acls = new ArrayList<ACL>();
    Id bushro1 = new Id("digest", AclUtils.getDigestUserPwd("bushro1:123456"));
    Id bushro2 = new Id("digest", AclUtils.getDigestUserPwd("bushro2:123456"));
    acls.add(new ACL(Perms.ALL, bushro1));
    acls.add(new ACL(Perms.READ, bushro2));
    acls.add(new ACL(Perms.DELETE | Perms.CREATE, bushro2));
    // 创建节点
    byte[] data = "spiderman".getBytes();
    cto.client.create().creatingParentsIfNeeded()//递归方式创建
        .withMode(CreateMode.PERSISTENT)
        .withACL(acls)//该方式只有对最后一个节点设置权限,相应的父节点默认都是"world", "anyone",
                      // 如果后面添加true那么所有创建的节点都是设置的权限
        .forPath(nodePath, data);
    //设置权限
    cto.client.setACL().withACL(acls).forPath("/curatorNode");
    // 更新节点数据
    byte[] newData = "batman".getBytes();
    cto.client.setData().withVersion(0).forPath(nodePath, newData);
    // 删除节点
    cto.client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(0).forPath(nodePath);
    // 读取节点数据
    Stat stat = new Stat();
    byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
    System.out.println("节点" + nodePath + "的数据为: " + new String(data));
    System.out.println("该节点的版本号为: " + stat.getVersion());
    cto.closeZKClient();
    boolean isZkCuratorStarted2 = cto.client.isStarted();
    System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
  }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
8月前
|
存储 Java Linux
【Zookeeper】Introduction to Apache Curator
【Zookeeper】Introduction to Apache Curator
152 0
|
2月前
|
Java Apache C++
别再手写RPC了,Apache Thrift帮你自动生成RPC客户端及服务端代码
Thrift 是一个轻量级、跨语言的远程服务调用框架,由 Facebook 开发并贡献给 Apache。它通过 IDL 生成多种语言的 RPC 服务端和客户端代码,支持 C++、Java、Python 等。Thrift 的主要特点包括开发速度快、接口维护简单、学习成本低和多语言支持。广泛应用于 Cassandra、Hadoop 等开源项目及 Facebook、百度等公司。
别再手写RPC了,Apache Thrift帮你自动生成RPC客户端及服务端代码
|
8月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
82 11
|
8月前
|
Java API Apache
Apache CXF生成WebService的客户端
Apache CXF生成WebService的客户端
266 0
|
8月前
|
Apache
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
149 2
|
8月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
68 0
|
8月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
74 0
|
缓存 Java API
Apache ZooKeeper - 使用Apache Curator操作ZK
Apache ZooKeeper - 使用Apache Curator操作ZK
183 0
|
Dubbo 应用服务中间件 Apache
java.lang.NoClassDefFoundError: org/apache/curator/framework/CuratorFrameworkFactory
java.lang.NoClassDefFoundError: org/apache/curator/framework/CuratorFrameworkFactory
504 0
java.lang.NoClassDefFoundError: org/apache/curator/framework/CuratorFrameworkFactory
|
JSON Java Apache
如果你想在Java代码中写一个Http客户端,你会选择哪一种方式?Okhttp vs Apache vs Jdk
如果你想在Java代码中写一个Http客户端,你会选择哪一种方式?Okhttp vs Apache vs Jdk

推荐镜像

更多