1 package com.yeepay.sxf.testrurator;
2
3 import java.util.List;
4 import java.util.concurrent.ExecutorService;
5 import java.util.concurrent.Executors;
6
7 import org.apache.curator.RetryPolicy;
8 import org.apache.curator.framework.CuratorFramework;
9 import org.apache.curator.framework.CuratorFrameworkFactory;
10 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
11 import org.apache.curator.framework.api.BackgroundCallback;
12 import org.apache.curator.framework.api.CreateBuilder;
13 import org.apache.curator.framework.api.CuratorEvent;
14 import org.apache.curator.framework.api.CuratorEventType;
15 import org.apache.curator.framework.api.DeleteBuilder;
16 import org.apache.curator.framework.api.ExistsBuilder;
17 import org.apache.curator.framework.api.GetChildrenBuilder;
18 import org.apache.curator.framework.api.GetDataBuilder;
19 import org.apache.curator.framework.api.SetDataBuilder;
20 import org.apache.curator.framework.recipes.cache.NodeCache;
21 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
22 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
23 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
24 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
25 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
26 import org.apache.curator.retry.ExponentialBackoffRetry;
27 import org.apache.curator.retry.RetryNTimes;
28 import org.apache.curator.retry.RetryUntilElapsed;
29 import org.apache.zookeeper.CreateMode;
30 import org.apache.zookeeper.data.Stat;
31
32 /**
33 * 测试Rurator客户端测试
34 * @author sxf
35 *
36 */
37 public class TestRurator {
38
39 //创建一个线程池,让异步调用的的回调方法,申请线程池中的资源
40 private static ExecutorService es=Executors.newFixedThreadPool(5);
41 public static void main(String[] args) throws Exception {
42 //重试策略
43 RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3);
44
45 //客户端构建器
46 Builder builder=CuratorFrameworkFactory.builder();
47
48 builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号
49 builder.sessionTimeoutMs(5000);//会话的超时时间
50 builder.connectionTimeoutMs(5000);//链接的超时时间
51 builder.retryPolicy(retryPolicy);//指定重试策略
52
53 //构建客户端
54 CuratorFramework client=builder.build();
55 client.start();
56
57 //测试创建节点
58 //testCreateNode(client);
59
60 //测试删除节点
61 //testDeleteNode(client);
62
63 //获取子节点列表
64 //getChildList(client);
65
66 //获取节点中存取的值
67 //getNodeValue(client);
68
69 //修改节点中存储数据的值
70 //updateNodeValue(client);
71
72 //判断一个节点是否存在
73 //isExistsNode(client);
74
75 //测试异步调用使用方法
76 //testSynCallBack(client);
77
78 //测试节点数据变动和节点被创建的事件足册
79 //nodeValueChangeListern(client);
80
81 //给某个节点注册监听其子节点的活动
82 nodeChildListernDothing(client);
83 Thread.sleep(Integer.MAX_VALUE);
84 }
85
86
87 /**
88 * 建立客户端链接的第一种方法
89 * @throws InterruptedException
90 */
91 public void testCreateSession() throws InterruptedException{
92 //重试策略可以实现该接口。通过自定义规则,进行重试
93 //重试链接的策略的接口
94 //(最开始时间间隔,最多重试次数) 随着次数递增,时间间隔还会增加
95 RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3);
96 //(最多重试次数,每次重试间隔的时间长度)
97 RetryPolicy retryPolicy2 =new RetryNTimes(5, 1000);
98 //(重试时间的总长度,每次重试的时间间隔) 这个基本可重试10次
99 RetryPolicy retryPolicy3 =new RetryUntilElapsed(10000, 1000);
100
101
102
103 //(ip地址端口,会话超时时间,建立连接超时时间,重试链接的策略)
104 CuratorFramework client=CuratorFrameworkFactory.newClient("10.151.30.75:2181", 5000,5000,retryPolicy);
105 //启动链接
106 client.start();
107
108 Thread.sleep(Integer.MAX_VALUE);
109
110 }
111
112 /**
113 * 建立客户端链接的第二种方法
114 */
115 public static void testCreateSession2(){
116 //重试策略
117 RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3);
118
119 //客户端构建器
120 Builder builder=CuratorFrameworkFactory.builder();
121
122 builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号
123 builder.sessionTimeoutMs(5000);//会话的超时时间
124 builder.connectionTimeoutMs(5000);//链接的超时时间
125 builder.retryPolicy(retryPolicy);//指定重试策略
126 builder.authorization(null);//给客户端添加权限AuthInfo 相关的知识在原生api里有讲过
127
128 // AuthInfo a=new AuthInfo("ip", "10.151.30.75:2181".getBytes());
129 // AuthInfo b=new AuthInfo("digest", "shangxiaofei:shangxiaofei".getBytes());
130
131 //构建客户端
132 CuratorFramework client=builder.build();
133
134 //建立链接
135 client.start();
136
137 }
138
139 /**
140 * 测试创建节点(调用顺序)
141 * 两种创建方式都可以
142 * @throws Exception
143 */
144 public static void testCreateNode(CuratorFramework client) throws Exception{
145 //创建节点构建器
146 // String path=client.create()
147 // .creatingParentsIfNeeded()//防止创建节点的父亲节点不存在,导致创建节点失败。如果父节点不存在,先创建父节点,再创建子节点
148 // .withMode(CreateMode.PERSISTENT)//指定节点的类型(持久节点,临时节点,持久节并顺序节点,临时并顺序节点)
149 // .forPath("/node_134", "xiaoshuai".getBytes());//节点路径,节点中存储的数据
150 //
151 CreateBuilder builder=client.create();
152 builder.creatingParentsIfNeeded();
153 builder.withMode(CreateMode.PERSISTENT);
154 builder.withACL(null);//添加这个节点的权限。相关内容,在原生zookeeper客户端有讲解。上上篇博客
155 /**
156 * //基于ip的权限,意味着这个ip的客户端对此节点有读取权限
157 * ACL ipacl=new ACL(Perms.READ, new Id("ip", "10.151.30.75"));
158 * //基于digest的权限,意味着只有这个用户名和密码的客户端才能读取和写的权限
159 * ACL digetacl=new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei")));
160 *
161 * List<ACL> myaclAcls=new ArrayList<ACL>();
162 * myaclAcls.add(ipacl);
163 * myaclAcls.add(digetacl);
164 */
165 String pathString=builder.forPath("/node_135/node_135_2", "zhengjiuzhonghua".getBytes());
166
167
168
169 System.out.println("TestRurator.testCreateNode()"+pathString);
170 }
171
172 /**
173 * 测试删除节点(调用顺序不能乱)
174 * @param client
175 * @throws Exception
176 */
177 public static void testDeleteNode(CuratorFramework client) throws Exception{
178 DeleteBuilder delteBuilder=client.delete();
179 delteBuilder.guaranteed();//保证节点必须删除,如果删除出现错误,则后台程序会不断去尝试删除。
180 delteBuilder.deletingChildrenIfNeeded();//如果存在子节点,先删除子节点
181 delteBuilder.withVersion(-1);//指定版本号
182 delteBuilder.forPath("/node_135");//指定路径
183 }
184
185 /**
186 * 获取子节点列表
187 * @param client
188 * @throws Exception
189 */
190 public static void getChildList(CuratorFramework client) throws Exception{
191 GetChildrenBuilder getChildBuilder=client.getChildren();
192 List<String> nodelist=getChildBuilder.forPath("/");
193 System.out.println("TestRurator.getChildList()"+nodelist);
194
195 }
196 /**
197 * 获取节点中存取的值
198 * @throws Exception
199 */
200 public static void getNodeValue(CuratorFramework client) throws Exception{
201
202 GetDataBuilder dataBuilder= client.getData();
203
204 Stat stat=new Stat();
205 dataBuilder.storingStatIn(stat);
206 byte[] bytes=dataBuilder.forPath("/node_135/node_135_2");
207 System.out.println("TestRurator.getNodeValue(data==>)"+new String(bytes));
208 System.out.println("TestRurator.getNodeValue()"+stat);
209 }
210
211 /**
212 * 修改节点中的数据
213 * @param client
214 * @throws Exception
215 */
216 public static void updateNodeValue(CuratorFramework client) throws Exception{
217 SetDataBuilder setDataBuilder=client.setData();
218 Stat stat=setDataBuilder.withVersion(-1).forPath("/node_135/node_135_2", "tianxiawushuang".getBytes());
219 System.out.println("TestRurator.updateNodeValue()"+stat);
220 }
221
222 /**
223 * 判断一个节点是否存在
224 * @param client
225 * @throws Exception
226 */
227 public static void isExistsNode(CuratorFramework client) throws Exception{
228 ExistsBuilder existsBuilder=client.checkExists();
229 Stat stat=existsBuilder.forPath("/node_135/node_135_2");
230 if(stat!=null){
231 System.out.println("TestRurator.isExistsNode()节点存在");
232 }else{
233 System.out.println("TestRurator.isExistsNode()节点不存在");
234 }
235 }
236 /**
237 * 以判断节点是否存在,讲解异步调用的使用
238 * @param client
239 * @throws Exception
240 */
241 public static void testSynCallBack(CuratorFramework client) throws Exception{
242 ExistsBuilder existsBuilder=client.checkExists();
243 existsBuilder.inBackground(new myBalckCallBack(),"sxf",es).forPath("/node_135/node_135_2");
244 //System.out.println("TestRurator.testSynCallBack(异步调用返回)"+stat);
245 }
246
247 /**
248 * 异步调用回调接口的实现类
249 * @author sxf
250 *
251 */
252 private static class myBalckCallBack implements BackgroundCallback{
253
254 @Override
255 public void processResult(CuratorFramework client, CuratorEvent event)
256 throws Exception {
257 //回调类型
258 CuratorEventType curatorEventType=event.getType();
259 //节点路径
260 String path=event.getPath();
261 //节点列表
262 List<String> list=event.getChildren();
263 //获取上下文
264 Object context=event.getContext();
265 //获取返回碼 异步操作成功,返回0
266 int code=event.getResultCode();
267 //获取数据内容
268 byte[] data=event.getData();
269
270 StringBuffer sb=new StringBuffer();
271 sb.append("curatorEventType="+curatorEventType).append("\n");
272 sb.append("path="+path).append("\n");
273 sb.append("list="+list).append("\n");
274 sb.append("context="+context).append("\n");
275 sb.append("data="+new String(data)).append("\n");
276
277 System.out.println(sb.toString());
278 }
279
280 }
281
282
283 /**
284 *节点事件监听(节点被创建,节点中存储的数据被修改)
285 * @param client
286 * @throws Exception
287 */
288 public static void nodeValueChangeListern(CuratorFramework client) throws Exception{
289 //注册监听
290 final NodeCache nodeCache =new NodeCache(client, "/node_135/node_135_2");;
291 //开启监听
292 nodeCache.start();
293 //添加监听器(实现指定接口的实例对象)
294 nodeCache.getListenable().addListener(new NodeCacheListener() {
295
296 @Override
297 public void nodeChanged() throws Exception {
298 //获取当前节点的最近数据
299 byte[] bytes=nodeCache.getCurrentData().getData();
300 System.out.println(new String(bytes));
301
302 }
303 });
304 }
305
306 /**
307 * 监听节点的子节点事件注册
308 * @param client
309 * @throws Exception
310 */
311 public static void nodeChildListernDothing(CuratorFramework client) throws Exception{
312 //注册监听器
313 //第三个参数:系统在监听到子节点列表发生变化时同时获取子节点的数据内容
314 final PathChildrenCache cache=new PathChildrenCache(client, "/node_135", true);
315 //开启监听
316 cache.start();
317 //注册监听器
318 cache.getListenable().addListener(new PathChildrenCacheListener() {
319
320 @Override
321 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
322 throws Exception {
323 Type type=event.getType();
324 switch (event.getType()) {
325 case CHILD_ADDED://添加子节点
326 System.out.println("新添加子节点中存储的值==>"+new String(event.getData().getData()));
327 break;
328
329 case CHILD_UPDATED://子节点被修改
330 System.out.println("被修改的子节点中存储的值==>"+new String(event.getData().getData()));
331 break;
332 case CHILD_REMOVED://子节点被删除
333 System.out.println("被删除的字节点的路径==>"+event.getInitialData());
334 break;
335
336 default:
337 break;
338 }
339
340 }
341 });
342 }
343
344 }