zk客户端与服务器连接
客户端和zk服务端链接是一个异步的过程
当连接成功后后,客户端会收的一个watch通知
/** * @Title: ZKConnectDemo.java * @Description: zookeeper 连接demo演示 */ public class ZKConnect implements Watcher { final static Logger log = LoggerFactory.getLogger(ZKConnect.class); public static final String zkServerPath = "192.168.254.130:2181"; // public static final String zkServerPath = "192.168.1.111:2181,192.168.1.111:2182,192.168.1.111:2183"; public static final Integer timeout = 5000; public static void main(String[] args) throws Exception { /** * 客户端和zk服务端链接是一个异步的过程 * 当连接成功后后,客户端会收的一个watch通知 * * 参数: * connectString:连接服务器的ip字符串, * 比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181" * 可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群 * 也可以在ip后加路径 * sessionTimeout:超时时间,心跳收不到了,那就超时 * watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写, * 此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用 * sessionId:会话的id * sessionPasswd:会话密码 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话 */ ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnect()); log.warn("客户端开始连接zookeeper服务器..."); log.warn("连接状态:{}", zk.getState()); new Thread().sleep(2000); log.warn("连接状态:{}", zk.getState()); } @Override public void process(WatchedEvent event) { log.warn("接受到watch通知:{}", event); } }
zk会话重连机制
获取到连接zk的会话id和会话密码,我们就可以再次回到会话中
/** * * @Title: ZKConnectDemo.java * @Description: zookeeper 恢复之前的会话连接demo演示 */ public class ZKConnectSessionWatcher implements Watcher { final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class); public static final String zkServerPath = "192.168.254.130:2181"; public static final Integer timeout = 5000; public static void main(String[] args) throws Exception { ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher()); new Thread().sleep(1000); long sessionId = zk.getSessionId(); String ssid = "0x" + Long.toHexString(sessionId); System.out.println(ssid); byte[] sessionPassword = zk.getSessionPasswd(); log.warn("客户端开始连接zookeeper服务器..."); log.warn("连接状态:{}", zk.getState()); log.warn("连接状态:{}", zk.getState()); new Thread().sleep(200); // 开始会话重连 log.warn("开始会话重连..."); ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword); log.warn("重新连接状态zkSession:{}", zkSession.getState()); new Thread().sleep(1000); log.warn("重新连接状态zkSession:{}", zkSession.getState()); } @Override public void process(WatchedEvent event) { log.warn("接受到watch通知:{}", event); } }
java操作zk,节点的创建,删除,设置
/** * * @Title: ZKConnectDemo.java * @Description: zookeeper 操作demo演示 */ public class ZKNodeOperator implements Watcher { private ZooKeeper zookeeper = null; public static final String zkServerPath = "192.168.254.130:2181"; public static final Integer timeout = 5000; public ZKNodeOperator() {} public ZKNodeOperator(String connectString) { try { zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeOperator()); } catch (IOException e) { e.printStackTrace(); if (zookeeper != null) { try { zookeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } } /** * * @Title: ZKOperatorDemo.java * @Description: 创建zk节点 */ public void createZKNode(String path, byte[] data, List<ACL> acls) { String result = ""; try { /** * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数 * 参数: * path:创建的路径 * data:存储的数据的byte[] * acl:控制权限策略 * Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa * CREATOR_ALL_ACL --> auth:user:password:cdrwa * createMode:节点类型, 是一个枚举 * PERSISTENT:持久节点 * PERSISTENT_SEQUENTIAL:持久顺序节点 * EPHEMERAL:临时节点 * EPHEMERAL_SEQUENTIAL:临时顺序节点 */ result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT); //异步创建,CreateCallBack是一个回调函数 /* String ctx = "{'create':'success'}"; zookeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);*/ System.out.println("创建节点:\t" + result + "\t成功..."); new Thread().sleep(2000); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath); // 创建zk节点 zkServer.createZKNode("/testnode", "testnode".getBytes(), Ids.OPEN_ACL_UNSAFE); /** * 设置节点数据 * 参数: * path:节点路径 * data:数据 * version:数据状态 */ // Stat status = zkServer.getZookeeper().setData("/testnode", "xyz".getBytes(), 2); // System.out.println(status.getVersion()); /** * 同步删除数据,没有返回数据 * 参数: * path:节点路径 * version:数据状态 */ /* zkServer.createZKNode("/test-delete-node", "123".getBytes(), Ids.OPEN_ACL_UNSAFE); zkServer.getZookeeper().delete("/test-delete-node", 2); /** * 异步删除数据,在回调函数里面返回 * 参数: * path:节点路径 * version:数据状态 */ String ctx = "{'delete':'success'}"; zkServer.getZookeeper().delete("/test-delete-node", 0, new DeleteCallBack(), ctx); Thread.sleep(2000);*/ } public ZooKeeper getZookeeper() { return zookeeper; } public void setZookeeper(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } @Override public void process(WatchedEvent event) { } }
删除回调函数
public class DeleteCallBack implements VoidCallback { @Override public void processResult(int rc, String path, Object ctx) { System.out.println("删除节点" + path); System.out.println((String)ctx); } }
获取zk节点数据
使用CountDownLatch来监听
public class ZKGetNodeData implements Watcher { private ZooKeeper zookeeper = null; public static final String zkServerPath = "192.168.254.130:2181"; public static final Integer timeout = 5000; //存放节点数据 private static Stat stat = new Stat(); public ZKGetNodeData() {} public ZKGetNodeData(String connectString) { try { zookeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData()); } catch (IOException e) { e.printStackTrace(); if (zookeeper != null) { try { zookeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } } private static CountDownLatch countDown = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath); /** * 参数: * path:节点路径 * watch:true或者false,注册一个watch事件 * stat:状态 */ byte[] resByte = zkServer.getZookeeper().getData("/bushro", true, stat); String result = new String(resByte); System.out.println("当前值:" + result); countDown.await(); } @Override public void process(WatchedEvent event) { try { if(event.getType() == EventType.NodeDataChanged){ ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath); byte[] resByte = zkServer.getZookeeper().getData("/bushro", false, stat); String result = new String(resByte); System.out.println("更改后的值:" + result); System.out.println("版本号变化dversion:" + stat.getVersion()); countDown.countDown(); } else if(event.getType() == EventType.NodeCreated) { } else if(event.getType() == EventType.NodeChildrenChanged) { } else if(event.getType() == EventType.NodeDeleted) { } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public ZooKeeper getZookeeper() { return zookeeper; } public void setZookeeper(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } }
获取zk子节点列表
public class ZKGetChildrenList implements Watcher { private ZooKeeper zookeeper = null; public static final String zkServerPath = "192.168.254.130:2181"; public static final Integer timeout = 5000; public ZKGetChildrenList() {} public ZKGetChildrenList(String connectString) { try { zookeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList()); } catch (IOException e) { e.printStackTrace(); if (zookeeper != null) { try { zookeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } } private static CountDownLatch countDown = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath); /** * 参数: * path:父节点路径 * watch:true或者false,注册一个watch事件 */ // List<String> strChildList = zkServer.getZookeeper().getChildren("/bushro", true); // for (String s : strChildList) { // System.out.println(s); // } // 异步调用 String ctx = "{'callback':'ChildrenCallback'}"; //回调函数一个是有返回子节点信息(Children2CallBack),一个没有(ChildrenCallBack) // zkServer.getZookeeper().getChildren("/imooc", true, new ChildrenCallBack(), ctx); zkServer.getZookeeper().getChildren("/bushro", true, new Children2CallBack(), ctx); countDown.await(); } @Override public void process(WatchedEvent event) { try { if(event.getType()==EventType.NodeChildrenChanged){ System.out.println("NodeChildrenChanged"); ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath); List<String> strChildList = zkServer.getZookeeper().getChildren(event.getPath(), false); for (String s : strChildList) { System.out.println("子节点: "+s); } countDown.countDown(); } else if(event.getType() == EventType.NodeCreated) { System.out.println("NodeCreated"); } else if(event.getType() == EventType.NodeDataChanged) { System.out.println("NodeDataChanged"); } else if(event.getType() == EventType.NodeDeleted) { System.out.println("NodeDeleted"); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public ZooKeeper getZookeeper() { return zookeeper; } public void setZookeeper(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } }
回调函数
public class Children2CallBack implements Children2Callback { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { for (String s : children) { System.out.println(s); } System.out.println("ChildrenCallback:" + path); System.out.println((String)ctx); System.out.println("czxid:"+stat.getCzxid()); System.out.println("ctime:"+stat.getCtime()); System.out.println("mzxid:"+stat.getMzxid()); System.out.println("mtime:"+stat.getMtime()); System.out.println("pzxid:"+stat.getPzxid()); System.out.println("cversion:"+stat.getCversion()); System.out.println("version:"+stat.getVersion()); System.out.println("ephemeralOwner:"+stat.getEphemeralOwner()); System.out.println("numChildren:"+stat.getNumChildren()); } }
删除子节点后触发的事件
判断节否存在
public class ZKNodeExist implements Watcher { private ZooKeeper zookeeper = null; public static final String zkServerPath = "192.168.254.130:2181"; public static final Integer timeout = 5000; public ZKNodeExist() {} public ZKNodeExist(String connectString) { try { zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeExist()); } catch (IOException e) { e.printStackTrace(); if (zookeeper != null) { try { zookeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } } private static CountDownLatch countDown = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZKNodeExist zkServer = new ZKNodeExist(zkServerPath); /** * 参数: * path:节点路径 * watch:watch */ Stat stat = zkServer.getZookeeper().exists("/bushro", true); if (stat != null) { System.out.println("查询的节点版本为dataVersion:" + stat.getVersion()); } else { System.out.println("该节点不存在..."); } countDown.await(); } @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeCreated) { System.out.println("节点创建"); countDown.countDown(); } else if (event.getType() == EventType.NodeDataChanged) { System.out.println("节点数据改变"); countDown.countDown(); } else if (event.getType() == EventType.NodeDeleted) { System.out.println("节点删除"); countDown.countDown(); } } public ZooKeeper getZookeeper() { return zookeeper; } public void setZookeeper(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } }
Acl相关操作
public class ZKNodeAcl implements Watcher { private ZooKeeper zookeeper = null; public static final String zkServerPath = "192.168.254.130:2181"; public static final Integer timeout = 5000; public ZKNodeAcl() {} public ZKNodeAcl(String connectString) { try { zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeAcl()); } catch (IOException e) { e.printStackTrace(); if (zookeeper != null) { try { zookeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } } public void createZKNode(String path, byte[] data, List<ACL> acls) { String result = ""; try { /** * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数 * 参数: * path:创建的路径 * data:存储的数据的byte[] * acl:控制权限策略 * Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa * CREATOR_ALL_ACL --> auth:user:password:cdrwa * createMode:节点类型, 是一个枚举 * PERSISTENT:持久节点 * PERSISTENT_SEQUENTIAL:持久顺序节点 * EPHEMERAL:临时节点 * EPHEMERAL_SEQUENTIAL:临时顺序节点 */ result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT); System.out.println("创建节点:\t" + result + "\t成功..."); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { ZKNodeAcl zkServer = new ZKNodeAcl(zkServerPath); /** * ====================== 创建node start ====================== */ //acl 任何人都可以访问 zkServer.createZKNode("/aclbushro", "test".getBytes(), Ids.OPEN_ACL_UNSAFE); //(测试1)自定义用户认证访问 List<ACL> acls = new ArrayList<ACL>(); Id bushro1 = new Id("digest", AclUtils.getDigestUserPwd("bushro1:123456")); Id bushro2 = new Id("digest", AclUtils.getDigestUserPwd("bushro2:123456")); //设置用户以及对应的权限 acls.add(new ACL(Perms.ALL, bushro1)); acls.add(new ACL(Perms.READ, bushro2)); acls.add(new ACL(Perms.DELETE | Perms.CREATE, bushro2)); zkServer.createZKNode("/aclbushro/testdigest", "testdigest".getBytes(), acls); //注册过的用户必须通过addAuthInfo才能操作节点,参考命令行 addauth zkServer.getZookeeper().addAuthInfo("digest", "bushro1:123456".getBytes()); zkServer.createZKNode("/aclbushro/testdigest/childtest", "childtest".getBytes(), Ids.CREATOR_ALL_ACL); Stat stat = new Stat(); byte[] data = zkServer.getZookeeper().getData("/aclbushro/testdigest", false, stat); System.out.println(new String(data)); zkServer.getZookeeper().setData("/aclbushro/testdigest", "now".getBytes(), 1); //(测试二)ip方式的acl List<ACL> aclsIP = new ArrayList<ACL>(); Id ipId1 = new Id("ip", "192.168.1.6"); aclsIP.add(new ACL(Perms.ALL, ipId1)); zkServer.createZKNode("/aclbushro/iptest6", "iptest".getBytes(), aclsIP); //验证ip是否有权限 zkServer.getZookeeper().setData("/aclbushro/iptest6", "now".getBytes(), 1); Stat stat = new Stat(); byte[] data = zkServer.getZookeeper().getData("/aclbushro/iptest6", false, stat); System.out.println(new String(data)); System.out.println(stat.getVersion()); } public ZooKeeper getZookeeper() { return zookeeper; } public void setZookeeper(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } @Override public void process(WatchedEvent event) { } }