what
Curator(监护人;管理者) 是 Netflix 公司开源的一个 Zookeeper 客户端,目前由 Apache 进行维护。与 Zookeeper 原生客户端相比,Curator 的抽象层次更高,功能也更加丰富,是目前 Zookeeper 使用范围最广的 Java客户端。
use
- 依赖
<dependencies> <!--Curator 相关依赖--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency> <!--单元测试相关依赖--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
- 基础API测试类 BasicOperationTest.java
public class BasicOperationTest {
private CuratorFramework client = null;
/**
* zookeeper服务器地址
*/
private static final String ZK_SERVER_PATH = "xxx.xx.xxx.xxx:2181";
private static final String NODE_PATH = "/hadoop/yarn";
@Before
public void prepare() {
// 重试策略
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(ZK_SERVER_PATH)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
//指定命名空间后,client 的所有路径操作都会以 / workspace 开头
client.start();
}
/**
* 判断服务状态
*/
@Test
public void getStatus() {
CuratorFrameworkState state = client.getState();
System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
}
/**
* 创建节点
*
* @throws Exception 可能出现异常
*/
@Test
public void createNodes() throws Exception {
byte[] data = "abc".getBytes();
client.create().creatingParentsIfNeeded()
//节点类型
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(NODE_PATH, data);
}
/**
* 获取节点信息
*
* @throws Exception 可能出现异常
*/
@Test
public void getNode() throws Exception {
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(NODE_PATH);
System.out.println("节点数据:" + new String(data));
System.out.println("节点信息:" + stat.toString());
}
/**
* 获取子节点列表
*
* @throws Exception 可能出现异常
*/
@Test
public void getChildrenNodes() throws Exception {
List<String> childNodes = client.getChildren().forPath("/hadoop");
for (String s : childNodes) {
System.out.println(s);
}
}
/**
* 更新节点
*
* @throws Exception 可能出现异常
*/
@Test
public void updateNode() throws Exception {
byte[] newData = "defg".getBytes();
// 传入版本号,如果版本号错误则拒绝更新操作,并抛出 BadVersion 异常
client.setData().withVersion(0)
.forPath(NODE_PATH, newData);
}
/**
* 删除节点
*
* @throws Exception 可能出现异常
*/
@Test
public void deleteNodes() throws Exception {
client.delete()
// 如果删除失败,那么在会继续执行,直到成功
.guaranteed()
// 如果有子节点,则递归删除
.deletingChildrenIfNeeded()
// 传入版本号,如果版本号错误则拒绝删除操作,并抛出 BadVersion 异常
.withVersion(0)
.forPath(NODE_PATH);
}
/**
* 判断节点是否存在
*
* @throws Exception 可能出现异常
*/
@Test
public void existNode() throws Exception {
// 如果节点存在则返回其状态信息如果不存在则为 null
Stat stat = client.checkExists().forPath(NODE_PATH + "aa/bb/cc");
System.out.println("节点是否存在:" + (stat != null));
}
/**
* 创建一次监听
*
* @throws Exception 可能出现异常
*/
@Test
public void DisposableWatch() throws Exception {
client.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
}
}).forPath(NODE_PATH);
//休眠以观察测试效果
Thread.sleep(1000 * 1000);
}
/**
* 创建永久监听
*
* @throws Exception 可能出现异常
*/
@Test
public void permanentWatch() throws Exception {
// 使用 NodeCache 包装节点,对其注册的监听作用于节点,且是永久性的
final NodeCache nodeCache = new NodeCache(client, NODE_PATH);
// 通常设置为 true, 代表创建 nodeCache 时,就去获取对应节点的值并缓存
nodeCache.start(true);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
System.out.println("节点路径:" + currentData.getPath() +
"数据:" + new String(currentData.getData()));
}
}
});
// 休眠以观察测试效果
Thread.sleep(1000 * 1000);
}
/**
* 监听字节点
*
* @throws Exception 可能出现异常
*/
@Test
public void permanentChildrenNodesWatch() throws Exception {
// 第三个参数代表除了节点状态外,是否还缓存节点内容
PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop",
true);
/*
* StartMode 代表初始化方式:
* NORMAL: 异步初始化
* BUILD_INITIAL_CACHE: 同步初始化
* POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发 INITIALIZED 事件
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点列表:");
childDataList.forEach(x -> System.out.println(x.getPath()));
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) {
switch (event.getType()) {
case INITIALIZED:
System.out.println("childrenCache 初始化完成");
break;
case CHILD_ADDED:
// 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入 childrenCache 缓存中
System.out.println("增加子节点:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("被修改的子节点的路径:" +
event.getData().getPath());
System.out.println("修改后的数据:" + new
String(event.getData().getData()));
break;
default:
System.out.println("无匹配!");
}
}
});
//休眠以观察测试效果
Thread.sleep(1000 * 1000);
}
@After
public void destroy() {
if (client != null) {
client.close();
}
}
}