编辑
👨🏻🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟
🌈擅长领域:Java、大数据、运维、电子
🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏,相应的有空了我也会回访,互助!!!
🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!
- 使用的项目构建工具为Maven,使用坐标如下:
<dependencies> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.13.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.13.0</version> </dependency> <!--google的工具类--> <dependency> <groupId>com.google.collections</groupId> <artifactId>google-collections</artifactId> <version>1.0</version> </dependency> <!--使用slf4j日志--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <!--junit单元测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
- Curator包含的几个包:
- curator-framework:对zookeeper的底层api的一些封装
- curator-client:提供一些客户端的操作,例如重试策略等
- urator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Curator的基本API
创建会话
- 使用静态方法创建客户端
RetryPolicy retry = new ExponentialBackoffRetry(1000, 1); CuratorFramework client = CuratorFrameworkFactory.newClient(connect, 5000, 2000, retry);
- newClient包含四个主要的参数:
Untitled - 使用功Fluent风格的Api创建会话
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(connect) .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .retryPolicy(retry) .build();
- 创建包含隔离命名空间的会话
为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径,例如(下面的例子)当客户端指定了独立命名空间为“/mybase”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的
RetryPolicy retry = new ExponentialBackoffRetry(1000, 1); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(connect) .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .retryPolicy(retry) .namespace("mybase") .build();
- 启动客户端
- 当创建会话成功,得到client实例然后可以直接调用其的start()方法打开客户端
client.start();
- 数据节点操作创建数据节点Zookeeper的节点创建模式:
- PERSISTENT:持久化
- PERSISTENT_SQUENTIAL:持久化并带有序列号
- EPHEMERAL:临时
- EPHEMERAL_SQUENTIAL:临时并带有序列号
- 创建一个节点,初始化内容为空
client.create().forPath("path"); //如果没有指定节点属性,节点创建模式默认为持久化节点,内容默认为空
- 创建一个节点,附带初始化内容
client.create().forPath("path","init".getBytes());
- 创建一个节点,指定创建模式(临时节点),内容为空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
- 创建一个节点,指定创建模式(临时节点),附带初始化内容
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes);
- 创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点。
client.create() .creatingParentContainersIfNeeded() .withMode(CraeteMOde.EPHEMERAL) .forPath("path","init".getBytes);
- 这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
- 删除数据节点
- 删除一个节点
client.delete().forPath("path"); //注意:此方法只能删除叶子节点,否则会抛出异常
- 删除一个节点,并且递归删除其他所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path);
- 删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
- 删除一个节点,强制保证删除
client.delete().guaranteed().forPath("path");
- guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
- 读取数据节点数据
- 读取一个数据节点的数据内容
client.getData().forPath("path");
- 注意,此方法返回值是byte[]
- 读取一个节点的数据内容,同时获取到该节点的stat
Stat stat=new Stat(); Client.getData().storingStatIn(stat).forPath("path");
- 更新数据节点
- 更新一个节点的数据内容
client.setData().forPath("path","data".getBytes());
- 注意:该接口会返回一个Stat实例
- 更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
- 检查节点时候存在
client.checkExists().forPath("path");
- 注意:该方法返回一个Stat实例,用于检查Znode是否存在的操作,可以协调额外的方法监控或者后台处理)并在最后调用forPath()指定要操作的Znode
- 获取某个节点的所有子节点路径
client.getChildren.forPath("path");
- 注意:该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
- 事务
- CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交
client.inTransaction().check().forPath("path") .and() .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes()) .and() .setData().withVersion(10086).forPath("path","data2".getBytes()) .and() .commit();
- 异步接口
- 创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息编辑
- 响应吗(#getResultCode())编辑
- 异步节点创建法:
Executor executor = Executors.newFixedThreadPool(2); client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode())); },executor) .forPath("path");
- 注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
- 实验demo
private final String connect="192.168.123.72:2180,192.168.123.73:2180,192.168.123.74:2180"; /** * 在zookeeper的/目录下创建节点 * @throws Exception */ @Test public void writeZookeeperAPITest() throws Exception { RetryPolicy retry = new ExponentialBackoffRetry(1000, 1); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(connect) .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .retryPolicy(retry) .namespace("mybase") .build(); client.start(); client.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/zoomdem"+new SimpleDateFormat("hh:mm:ss").format(new Date()),new SimpleDateFormat("yyyy-MM-dd").format(new Date()) .getBytes()); client.close(); } /** * 读取zookeeper节点下的携带参数 * @throws Exception */ @Test public void readZookeeperAPITest() throws Exception { RetryPolicy Retry = new ExponentialBackoffRetry(2000,3); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(connect) .sessionTimeoutMs(3000) .connectionTimeoutMs(4000) .namespace("thisday15") .retryPolicy(Retry) .build(); client.start(); byte[] thisday15s = client.getData().forPath("/hellozoom"); System.out.println(new String(thisday15s)); client.close(); } /** * 删除Zookeeper集群中的一个节点,每一个znode都有一个与之对应stat结构 * @thows Exception */ @Test public void deleteZookeeperAPITest() throws Exception{ RetryPolicy Retry = new ExponentialBackoffRetry(5000, 2200); CuratorFramework client = CuratorFrameworkFactory .builder() .namespace("thisday15") .retryPolicy(Retry) .connectString(connect) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .build(); client.start(); System.out.println(client.checkExists().forPath("/hellozoom").toString()); client.delete().forPath("/hellozoom"); System.out.println(client.checkExists().forPath("/hellozoom").toString()); client.close(); } /** * 创建Znode节点 */ @Test public void createZookeeperAPITest(){ RetryPolicy Retry = new ExponentialBackoffRetry(5000, 5000); CuratorFramework client = CuratorFrameworkFactory .builder() .connectionTimeoutMs(5000) .sessionTimeoutMs(3000) .connectString(connect) .retryPolicy(Retry) .build(); client.start(); try { client .create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/code3","init0".getBytes()); } catch (Exception e) { e.printStackTrace(); }finally { client.close(); } } /** * 使用异步接口创建节点 */ @Test public void backgroundZookeeperAPITest(){ RetryPolicy retry = new ExponentialBackoffRetry(5000, 5000); // 创建异步线程池 Executor executor = Executors.newFixedThreadPool(2); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(connect) .retryPolicy(retry) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .build(); client.start(); try { client .create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .inBackground(((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s;\\n eventResult:%s",curatorEvent.getType(),curatorEvent.getResultCode())); }),executor) .forPath("/code888","init889".getBytes()); } catch (Exception e) { e.printStackTrace(); }finally { client.close(); } }