概述
Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。
除此之外,Curator还提供了Zookeeper的各种应用场景:分布式栅栏、共享锁服务、Master选举机制和分布式计数器等。
官网地址:curator.apache.org/index.html
项目组件
名称 | 描述 |
Recipes | Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。 |
Framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
Utilities | 为Zookeeper提供的各种实用程序。 |
Client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
Errors | Curator如何处理错误,连接问题,可恢复的例外等。 |
Maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.1</version> </dependency>
- 我们只要引入curator-recipes,就会自动将其他的依赖引入。
- 我们文章使用的是5.2.1的版本,它对应的zookeeper api版本是3.6.3。
使用案例
创建会话
Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory
工厂类的newClient方法返回CuratorFramework,它有多个重载方法, 最后调用start方法创建连接。
CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
查看3个方法的源码,发现最终他们都是使用建造者模式,统一调用了builder的.build方法。
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig) { return builder(). connectString(connectString). sessionTimeoutMs(sessionTimeoutMs). connectionTimeoutMs(connectionTimeoutMs). retryPolicy(retryPolicy). zkClientConfig(zkClientConfig). build(); }
参数说明:
参数 | 说明 |
connectString | zk server的地址,多个server用逗号分隔 |
sessionTimeoutMs | 会话超时时间,默认是60s |
connectionTimeoutMs | 连接超时时间,默认是15s |
retryPolicy | 失败重试策略 |
zkClientConfig | zk客户端的配置,比如认证相关的配置 |
代码例子:
@Before public void init() { // 重试机制, 每个1秒钟重试1次,最多重试5次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5, 5000); // 定义客户端 CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 30000, 60000, retryPolicy); // 建立连接 curatorFramework.start(); }
- 重试机制RetryPolicy有很多内置的实现,可以看他的javadoc一目了然。
- 最后需要调用start方法建立连接。
创建节点
Curator创建节点的方法也是基于Fluent风格编码,原生API中的参数很多都转化为一层层的方法调用来进行设置,直接上例子。
@Test public void testCreateNode() throws Exception { // 创建临时的,任何人都有权限的节点 String result = curatorFramework.create().withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8)); log.warn("创建节点: [{}]", result); }
creatingParentContainersIfNeeded
方法用来递归创建父节点。withMode
方法可以设置节点的属性,是持久节点,临时节点,持久有序节点、临时有序节点等。withACL
方法可以设置节点的访问权限。forPath
方法设置节点的路径和对应值。- 更多方法使用可以自行参考java doc文档。
更新节点
@Test public void testUpdateNode() throws Exception { // 创建节点 curatorFramework.create().creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8)); // 不带版本修改节点 Stat stat = curatorFramework.setData().forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8)); log.info("修改后节点版本: [{}]", stat.getVersion()); // 带版本修改节点,版本不正确,报错 stat = curatorFramework.setData().withVersion(10).forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8)); log.info("修改后节点版本: [{}]", stat.getVersion()); }
withVersion
方法用来指定版本,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常,可以用来实现乐观锁。
查看节点
@Test public void testGetNode() throws Exception { // 创建节点 curatorFramework.create().creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8)); // 直接查询 byte[] data = curatorFramework.getData().forPath("/emNode"); log.info("节点数据: [{}]", new String(data)); // 同时获取节点属性,放到stat中 Stat stat = new Stat(); data = curatorFramework.getData().storingStatIn(stat).forPath("/emNode"); log.info("节点数据: [{}]", new String(data)); log.info("节点版本: [{}]", stat.getVersion()); // 查询的时候注册监听 data = curatorFramework.getData().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { log.info("事件监听: [{}]", event); } }).forPath("/emNode"); log.info("节点数据: [{}]", new String(data)); Thread.sleep(100000); }
getData()
方法用来获取GetDataBuilder
查询器。storingStatIn
方法用来存放查询到的节点属性信息。usingWatcher
方法用来注册监听器,这个方法只会一次监听。
删除节点
@Test public void testDeleteNode() throws Exception { // 创建节点 curatorFramework.create().creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8)); // 方法 curatorFramework.delete().deletingChildrenIfNeeded().forPath("/emNode"); }
delete()
用来创建删除构造器DeleteBuilder
。deletingChildrenIfNeeded()
删除节点并递归删除其子节点。withVersion()
指定版本删除,如果版本不对抛出异常guaranteed()
方法强制保证删除一个节点,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。
监听器使用
Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器,上面已经做了演示。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。
Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。以前的Cache事件监听的种类有3种Path Cache, Cache,Tree Cache, 在新版本中合并为一种CuratorCache, 我们以CuratorCache新版演示为例子。
CuratorCache试图将来自节点的数据保存在本地缓存中。还可以选择缓存该节点下的整个子树。将响应更新/创建/删除事件,下拉数据等。您可以注册侦听器,以便在发生更改时得到通知。
@Test public void testWatchCache() throws Exception { try { String workPath = "/aa/bb/cc"; // 创建节点 curatorFramework.create().creatingParentContainersIfNeeded() .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(workPath, "hello".getBytes(StandardCharsets.UTF_8)); // 创建缓存 CuratorCache curatorCache = CuratorCache.builder(curatorFramework, workPath).build(); CuratorCacheListener curatorCacheListener = new CuratorCacheListener() { @Override public void event(Type type, ChildData oldData, ChildData data) { log.info("%%%%%%%%%%%%%%%watch start……………………………………………………"); log.info("change type: [{}]", type); log.info("ZNode节点状态改变, path={}", data.getPath()); log.info("ZNode节点状态改变,before: [{}], after: [{}]", oldData != null ? new String(oldData.getData()): null, new String(data.getData())); } }; curatorCache.listenable().addListener(curatorCacheListener); curatorCache.start(); // 第1次变更节点数据 curatorFramework.setData().forPath("/aa/bb", "第1次更改内容".getBytes()); Thread.sleep(1000); // 第2次变更节点数据 curatorFramework.setData().forPath("/aa/bb/cc", "第2次更改内容".getBytes()); Thread.sleep(1000); // 第3次创建新节点 curatorFramework.create().creatingParentContainersIfNeeded() .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/aa/bb/dd", "hello".getBytes(StandardCharsets.UTF_8)); Thread.sleep(1000); Thread.sleep(5000); } catch (Exception e) { log.error("创建Cache监听失败", e); } finally { // 删除节点 curatorFramework.delete().deletingChildrenIfNeeded().forPath("/aa"); } }
- 通过
CuratorCache.builder
方法创建CuratorCache,默认监听当前节点以及子节点。
小结
上面简单演示了下curator框架操作zookeeper,相对于原生的api,简洁方便很多,更多的使用可以自己参考 doc。