curator是Netflix公司开源的一套ZooKeeper客户端,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作。包括连接重连,反复注册Watcher等。实现了Fluent风格的API接口,目前已经为Apache的顶级项目,是全世界使用最广泛的ZooKeeper客户端之一
第一:maven依赖
1 <dependency> 2 <groupId>org.apache.curator</groupId> 3 <artifactId>curator-framework</artifactId> 4 <version>2.8.0</version> 5 </dependency> 6 <dependency> 7 <groupId>org.apache.curator</groupId> 8 <artifactId>curator-recipes</artifactId> 9 <version>2.8.0</version> 10 </dependency>
第二:测试的代码
1 package com.yeepay.sxf.testrurator; 2 3 import java.util.List; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 import org.apache.curator.RetryPolicy; 8 import org.apache.curator.framework.CuratorFramework; 9 import org.apache.curator.framework.CuratorFrameworkFactory; 10 import org.apache.curator.framework.CuratorFrameworkFactory.Builder; 11 import org.apache.curator.framework.api.BackgroundCallback; 12 import org.apache.curator.framework.api.CreateBuilder; 13 import org.apache.curator.framework.api.CuratorEvent; 14 import org.apache.curator.framework.api.CuratorEventType; 15 import org.apache.curator.framework.api.DeleteBuilder; 16 import org.apache.curator.framework.api.ExistsBuilder; 17 import org.apache.curator.framework.api.GetChildrenBuilder; 18 import org.apache.curator.framework.api.GetDataBuilder; 19 import org.apache.curator.framework.api.SetDataBuilder; 20 import org.apache.curator.framework.recipes.cache.NodeCache; 21 import org.apache.curator.framework.recipes.cache.NodeCacheListener; 22 import org.apache.curator.framework.recipes.cache.PathChildrenCache; 23 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; 24 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type; 25 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; 26 import org.apache.curator.retry.ExponentialBackoffRetry; 27 import org.apache.curator.retry.RetryNTimes; 28 import org.apache.curator.retry.RetryUntilElapsed; 29 import org.apache.zookeeper.CreateMode; 30 import org.apache.zookeeper.data.Stat; 31 32 /** 33 * 测试Rurator客户端测试 34 * @author sxf 35 * 36 */ 37 public class TestRurator { 38 39 //创建一个线程池,让异步调用的的回调方法,申请线程池中的资源 40 private static ExecutorService es=Executors.newFixedThreadPool(5); 41 public static void main(String[] args) throws Exception { 42 //重试策略 43 RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3); 44 45 //客户端构建器 46 Builder builder=CuratorFrameworkFactory.builder(); 47 48 builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号 49 builder.sessionTimeoutMs(5000);//会话的超时时间 50 builder.connectionTimeoutMs(5000);//链接的超时时间 51 builder.retryPolicy(retryPolicy);//指定重试策略 52 53 //构建客户端 54 CuratorFramework client=builder.build(); 55 client.start(); 56 57 //测试创建节点 58 //testCreateNode(client); 59 60 //测试删除节点 61 //testDeleteNode(client); 62 63 //获取子节点列表 64 //getChildList(client); 65 66 //获取节点中存取的值 67 //getNodeValue(client); 68 69 //修改节点中存储数据的值 70 //updateNodeValue(client); 71 72 //判断一个节点是否存在 73 //isExistsNode(client); 74 75 //测试异步调用使用方法 76 //testSynCallBack(client); 77 78 //测试节点数据变动和节点被创建的事件足册 79 //nodeValueChangeListern(client); 80 81 //给某个节点注册监听其子节点的活动 82 nodeChildListernDothing(client); 83 Thread.sleep(Integer.MAX_VALUE); 84 } 85 86 87 /** 88 * 建立客户端链接的第一种方法 89 * @throws InterruptedException 90 */ 91 public void testCreateSession() throws InterruptedException{ 92 //重试策略可以实现该接口。通过自定义规则,进行重试 93 //重试链接的策略的接口 94 //(最开始时间间隔,最多重试次数) 随着次数递增,时间间隔还会增加 95 RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3); 96 //(最多重试次数,每次重试间隔的时间长度) 97 RetryPolicy retryPolicy2 =new RetryNTimes(5, 1000); 98 //(重试时间的总长度,每次重试的时间间隔) 这个基本可重试10次 99 RetryPolicy retryPolicy3 =new RetryUntilElapsed(10000, 1000); 100 101 102 103 //(ip地址端口,会话超时时间,建立连接超时时间,重试链接的策略) 104 CuratorFramework client=CuratorFrameworkFactory.newClient("10.151.30.75:2181", 5000,5000,retryPolicy); 105 //启动链接 106 client.start(); 107 108 Thread.sleep(Integer.MAX_VALUE); 109 110 } 111 112 /** 113 * 建立客户端链接的第二种方法 114 */ 115 public static void testCreateSession2(){ 116 //重试策略 117 RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3); 118 119 //客户端构建器 120 Builder builder=CuratorFrameworkFactory.builder(); 121 122 builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号 123 builder.sessionTimeoutMs(5000);//会话的超时时间 124 builder.connectionTimeoutMs(5000);//链接的超时时间 125 builder.retryPolicy(retryPolicy);//指定重试策略 126 builder.authorization(null);//给客户端添加权限AuthInfo 相关的知识在原生api里有讲过 127 128 // AuthInfo a=new AuthInfo("ip", "10.151.30.75:2181".getBytes()); 129 // AuthInfo b=new AuthInfo("digest", "shangxiaofei:shangxiaofei".getBytes()); 130 131 //构建客户端 132 CuratorFramework client=builder.build(); 133 134 //建立链接 135 client.start(); 136 137 } 138 139 /** 140 * 测试创建节点(调用顺序) 141 * 两种创建方式都可以 142 * @throws Exception 143 */ 144 public static void testCreateNode(CuratorFramework client) throws Exception{ 145 //创建节点构建器 146 // String path=client.create() 147 // .creatingParentsIfNeeded()//防止创建节点的父亲节点不存在,导致创建节点失败。如果父节点不存在,先创建父节点,再创建子节点 148 // .withMode(CreateMode.PERSISTENT)//指定节点的类型(持久节点,临时节点,持久节并顺序节点,临时并顺序节点) 149 // .forPath("/node_134", "xiaoshuai".getBytes());//节点路径,节点中存储的数据 150 // 151 CreateBuilder builder=client.create(); 152 builder.creatingParentsIfNeeded(); 153 builder.withMode(CreateMode.PERSISTENT); 154 builder.withACL(null);//添加这个节点的权限。相关内容,在原生zookeeper客户端有讲解。上上篇博客 155 /** 156 * //基于ip的权限,意味着这个ip的客户端对此节点有读取权限 157 * ACL ipacl=new ACL(Perms.READ, new Id("ip", "10.151.30.75")); 158 * //基于digest的权限,意味着只有这个用户名和密码的客户端才能读取和写的权限 159 * ACL digetacl=new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei"))); 160 * 161 * List<ACL> myaclAcls=new ArrayList<ACL>(); 162 * myaclAcls.add(ipacl); 163 * myaclAcls.add(digetacl); 164 */ 165 String pathString=builder.forPath("/node_135/node_135_2", "zhengjiuzhonghua".getBytes()); 166 167 168 169 System.out.println("TestRurator.testCreateNode()"+pathString); 170 } 171 172 /** 173 * 测试删除节点(调用顺序不能乱) 174 * @param client 175 * @throws Exception 176 */ 177 public static void testDeleteNode(CuratorFramework client) throws Exception{ 178 DeleteBuilder delteBuilder=client.delete(); 179 delteBuilder.guaranteed();//保证节点必须删除,如果删除出现错误,则后台程序会不断去尝试删除。 180 delteBuilder.deletingChildrenIfNeeded();//如果存在子节点,先删除子节点 181 delteBuilder.withVersion(-1);//指定版本号 182 delteBuilder.forPath("/node_135");//指定路径 183 } 184 185 /** 186 * 获取子节点列表 187 * @param client 188 * @throws Exception 189 */ 190 public static void getChildList(CuratorFramework client) throws Exception{ 191 GetChildrenBuilder getChildBuilder=client.getChildren(); 192 List<String> nodelist=getChildBuilder.forPath("/"); 193 System.out.println("TestRurator.getChildList()"+nodelist); 194 195 } 196 /** 197 * 获取节点中存取的值 198 * @throws Exception 199 */ 200 public static void getNodeValue(CuratorFramework client) throws Exception{ 201 202 GetDataBuilder dataBuilder= client.getData(); 203 204 Stat stat=new Stat(); 205 dataBuilder.storingStatIn(stat); 206 byte[] bytes=dataBuilder.forPath("/node_135/node_135_2"); 207 System.out.println("TestRurator.getNodeValue(data==>)"+new String(bytes)); 208 System.out.println("TestRurator.getNodeValue()"+stat); 209 } 210 211 /** 212 * 修改节点中的数据 213 * @param client 214 * @throws Exception 215 */ 216 public static void updateNodeValue(CuratorFramework client) throws Exception{ 217 SetDataBuilder setDataBuilder=client.setData(); 218 Stat stat=setDataBuilder.withVersion(-1).forPath("/node_135/node_135_2", "tianxiawushuang".getBytes()); 219 System.out.println("TestRurator.updateNodeValue()"+stat); 220 } 221 222 /** 223 * 判断一个节点是否存在 224 * @param client 225 * @throws Exception 226 */ 227 public static void isExistsNode(CuratorFramework client) throws Exception{ 228 ExistsBuilder existsBuilder=client.checkExists(); 229 Stat stat=existsBuilder.forPath("/node_135/node_135_2"); 230 if(stat!=null){ 231 System.out.println("TestRurator.isExistsNode()节点存在"); 232 }else{ 233 System.out.println("TestRurator.isExistsNode()节点不存在"); 234 } 235 } 236 /** 237 * 以判断节点是否存在,讲解异步调用的使用 238 * @param client 239 * @throws Exception 240 */ 241 public static void testSynCallBack(CuratorFramework client) throws Exception{ 242 ExistsBuilder existsBuilder=client.checkExists(); 243 existsBuilder.inBackground(new myBalckCallBack(),"sxf",es).forPath("/node_135/node_135_2"); 244 //System.out.println("TestRurator.testSynCallBack(异步调用返回)"+stat); 245 } 246 247 /** 248 * 异步调用回调接口的实现类 249 * @author sxf 250 * 251 */ 252 private static class myBalckCallBack implements BackgroundCallback{ 253 254 @Override 255 public void processResult(CuratorFramework client, CuratorEvent event) 256 throws Exception { 257 //回调类型 258 CuratorEventType curatorEventType=event.getType(); 259 //节点路径 260 String path=event.getPath(); 261 //节点列表 262 List<String> list=event.getChildren(); 263 //获取上下文 264 Object context=event.getContext(); 265 //获取返回碼 异步操作成功,返回0 266 int code=event.getResultCode(); 267 //获取数据内容 268 byte[] data=event.getData(); 269 270 StringBuffer sb=new StringBuffer(); 271 sb.append("curatorEventType="+curatorEventType).append("\n"); 272 sb.append("path="+path).append("\n"); 273 sb.append("list="+list).append("\n"); 274 sb.append("context="+context).append("\n"); 275 sb.append("data="+new String(data)).append("\n"); 276 277 System.out.println(sb.toString()); 278 } 279 280 } 281 282 283 /** 284 *节点事件监听(节点被创建,节点中存储的数据被修改) 285 * @param client 286 * @throws Exception 287 */ 288 public static void nodeValueChangeListern(CuratorFramework client) throws Exception{ 289 //注册监听 290 final NodeCache nodeCache =new NodeCache(client, "/node_135/node_135_2");; 291 //开启监听 292 nodeCache.start(); 293 //添加监听器(实现指定接口的实例对象) 294 nodeCache.getListenable().addListener(new NodeCacheListener() { 295 296 @Override 297 public void nodeChanged() throws Exception { 298 //获取当前节点的最近数据 299 byte[] bytes=nodeCache.getCurrentData().getData(); 300 System.out.println(new String(bytes)); 301 302 } 303 }); 304 } 305 306 /** 307 * 监听节点的子节点事件注册 308 * @param client 309 * @throws Exception 310 */ 311 public static void nodeChildListernDothing(CuratorFramework client) throws Exception{ 312 //注册监听器 313 //第三个参数:系统在监听到子节点列表发生变化时同时获取子节点的数据内容 314 final PathChildrenCache cache=new PathChildrenCache(client, "/node_135", true); 315 //开启监听 316 cache.start(); 317 //注册监听器 318 cache.getListenable().addListener(new PathChildrenCacheListener() { 319 320 @Override 321 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) 322 throws Exception { 323 Type type=event.getType(); 324 switch (event.getType()) { 325 case CHILD_ADDED://添加子节点 326 System.out.println("新添加子节点中存储的值==>"+new String(event.getData().getData())); 327 break; 328 329 case CHILD_UPDATED://子节点被修改 330 System.out.println("被修改的子节点中存储的值==>"+new String(event.getData().getData())); 331 break; 332 case CHILD_REMOVED://子节点被删除 333 System.out.println("被删除的字节点的路径==>"+event.getInitialData()); 334 break; 335 336 default: 337 break; 338 } 339 340 } 341 }); 342 } 343 344 }