叨絮
前面把理论都大致撸了一下,接下来,我们就看看具体怎么用他吧
下载 安装
zookeeper单机部署部署
- 下载地址
mirror.bit.edu.cn/apache/zook…
- 解压zookeeper 压缩包
tar -xvf apache-zookeeper-3.6.2-bin.tar.gz
- 进入解压后的zookeeper目录
cd apache-zookeeper-3.6.2-bin
- 编辑 zoo.cfg文件
cd ZOOKEEPER_HOME/conf vim zoo.cfg 添加如下内容:
tickTime=2000 dataDir=/usr/local/zookeeper/data clientPort=2181 复制代码
保存编辑
- 启动zookeeper
cd ZOOKEEPER_HOME/bin
执行 ./zkServer.sh start
ZOOKEEPER集群部署方式
集群部署方式 在 zoo.cfg 中添加每个集群服务器的ip配置
tickTime=2000 dataDir=/usr/local/zookeeper/data // zookeeper 数据目录 clientPort=2181 initLimit=5 syncLimit=2 server.1=yourServerIP:2888:3888 server.2=yourServerIP:2888:3888 server.3=yourServerIP:2888:3888 复制代码
dataDir=/usr/local/zookeeper/data zookeeper 目录中添加myid 文件 vi myid 输入内容对应 服务id 每个zookeeper 服务器 分别 对应输入 1 2 3
设置完成后分别启动zookeeper集群服务器
ZOOKEEPER_HOME/bin/zkServer.sh start 复制代码
客户端命令行操作
启动客户端
bin/zkCli.sh 复制代码
至于它的其他命令,我这边就不一一去列举了,最主要
详解Zookeeper客户端Curator的使用
zookeeper的原生使用的话,这边就不一一讲了,而且生产上也不建议用原生的,最好是使用人家封装好的, 而Curator无疑是Zookeeper客户端中的瑞士军刀,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。
依赖
<!--zookeeper--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> <exclusions> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> </exclusions> </dependency> 复制代码
Curator包含了几个包:
- curator-framework:对zookeeper的底层api的一些封装
- curator-client:提供一些客户端的操作,例如重试策略等
- curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
这边我要说明一下的是 我们的zk版本,为啥我要把zk排除呢?就是因为不兼容的问题,所以呢,我们必须要把zk的客户端版本,和我们服务端版本一致才可以。
Api
会话创建
我们知道 zk 是cs 架构, 也就是分为服务端和客户端,我们的curator 其实就是一个客户端,那么首先要做的事情,当然就是和服务端建立连接嘛
- 使用静态工厂方法创建会话
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zkAddr, 5000, 3000, new ExponentialBackoffRetry(1000, 5)); 复制代码
构造函数主要有4个参数:
- 使用Fluent风格创建会话
CuratorFramework zkClient = CuratorFrameworkFactory.builder().connectString(zkAddr) .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .retryPolicy(new ExponentialBackoffRetry(1000, 5)) .build(); 复制代码
启动客户端
当创建会话成功,得到实例然后可以直接调用其start( )方法启动客户端。
zkClient.start(); 复制代码
数据节点操作
zk一般有以下4种节点类型
- PERSISTENT:持久节点
- PERSISTENT_SEQUENTIAL:持久顺序节点
- EPHEMERAL:临时节点
- EPHEMERAL_SEQUENTIAL:临时顺序节点
创建
// 创建一个节点,初始内容为空 // 如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空 zkClient.create().forPath("test"); // 创建一个节点,附带初始化内容 zkClient.create().forPath("test", "contect".getBytes()); // 创建一个节点,指定创建模式(临时节点),内容为空 zkClient.create().withMode(CreateMode.EPHEMERAL).forPath("test"); // 创建一个节点,指定创建模式(临时节点),附带初始化内容 zkClient.create().withMode(CreateMode.EPHEMERAL).forPath("test", "contect".getBytes()); // 创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点 zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("test", "contect".getBytes()); 复制代码
自动递归创建父节点非常有用,一般情况创建一个子节点必须先判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()能够自动递归创建所有所需的父节点。
删除
// 删除一个节点 // 此方法只能删除叶子节点,否则会抛出异常。 zkClient.delete().forPath("test"); // 删除一个节点,并且递归删除其所有的子节点 zkClient.delete().deletingChildrenIfNeeded().forPath("test"); // 删除一个节点,强制指定版本进行删除 zkClient.delete().withVersion(1000).forPath("test"); // 删除一个节点,强制保证删除 // guaranteed()是一个保障措施,只要客户端会话有效,会在后台持续进行删除操作,直到成功。 zkClient.delete().guaranteed().forPath("test"); // 上面的多个流式接口是可以自由组合的,例如: zkClient.delete().guaranteed().deletingChildrenIfNeeded().withVersion(1000).forPath("test"); 复制代码
读取数据节点
// 读取一个节点的数据内容 zkClient.getData().forPath("test"); // 读取一个节点的数据内容,同时获取到该节点的stat Stat stat = new Stat(); zkClient.getData().storingStatIn(stat).forPath("test"); //获取某个节点的所有子节点路径 zkClient.getChildren().forPath("test"); 复制代码
通过传递Stat可以获取到读取的节点状态信息(例如版本号),zookeeper内部根据版本号区别当前的更新是不是最新,所以带版本号更新可避免并行操作带来的数据不一致问题。
更新数据节点
// 更新一个节点的数据内容 // 该接口会返回一个Stat实例 client.setData().forPath("test", "contect".getBytes()); // 更新一个节点的数据内容,强制指定版本进行更新 client.setData().withVersion(1000 ).forPath("test", "contect".getBytes()); // 检查节点是否存在 client.checkExists().forPath("test"); 复制代码
事务
CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务。可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。例如:
zkClient.inTransaction().check().forPath("test") .and().create().withMode(CreateMode.PERSISTENT).forPath("test", "contect".getBytes()) .and().setData().forPath("test", "newContect".getBytes()) .and().commit(); 复制代码
异步接口
上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 300, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "zkSyncThreadPool"); } }); zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode())); }, executor) .forPath("test"); 复制代码
如果inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
监听、缓存
原生Zookeeper通过注册Watcher来进行事件监听,而Watcher只能单次注册,如果需要多次使用则需要反复注册。Cache是Curator中对事件监听的包装,可以看作是在本地缓存了事件监听,从而实现反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
PathChildrenCache
PathChildrenCache监听一个节点(Znode)下面所有子节点的变化,当一个子节点增加, 更新,删除时,PathChildrenCache可以监听到子节点的变化,同时获取子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener。
// 监听路径子节点的变化 PathChildrenCache watcher = new PathChildrenCache(zkClient, getPath(), true); PathChildrenCacheListener childrenCacheListener = (framework, event) -> { if (event.getType() != CHILD_UPDATED || !getPath().equals(event.getData().getPath())) { return; } initDataSource(); System.out.println("数据源更新完毕"); System.out.println("子节点事件类型:" + event.getType() + " | 路径:" + (null != event.getData() ? event.getData().getPath() : null)); }; watcher.getListenable().addListener(childrenCacheListener); watcher.start(); 复制代码
NodeCache
NodeCache与PathChildrenCache类似,但它是监听某个节点的变化,当节点发送变化(增加,删除,内容修改),NodeCache同样可以监听到节点的变化。
// 监听节点变化 NodeCache cache = new NodeCache(zkClient, getPath()); NodeCacheListener nodeCacheListener = () -> { ChildData data = cache.getCurrentData(); if (null != data) { System.out.println("节点数据:" + new String(cache.getCurrentData().getData())); } else { System.out.println("节点被删除!"); } }; cache.getListenable().addListener(nodeCacheListener); cache.start(); 复制代码