9.1 基本使用
org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话
它提供以下几类主要方法 :
功能 |
描述 |
create |
在本地目录树中创建一个节点 |
delete |
删除一个节点 |
exists |
测试本地是否存在目标节点 |
get/set data |
从目标节点上读取 / 写数据 |
get/set ACL |
获取 / 设置目标节点访问控制列表信息 |
get children |
检索一个子节点上的列表 |
sync |
等待要被传送的数据 |
表 1 : ZooKeeper API 描述
9.2 增删改查znode数据
package cn.com.toto.zk;
import java.io.IOException;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper;
public class SimpleDemo { //回话超时时间,设置为与系统默认时间一致 private static final int SESSION_TIMEOUT = 30000; //创建ZooKeeper实例 ZooKeeper zk; //创建Watcher实例 Watcher wh = new Watcher() {
@Override public void process(WatchedEvent event) { System.out.println(event.toString()); } };
//初始化ZooKeeper实例 private void createZKInstance() throws IOException { zk = new ZooKeeper("hadoop:2181,hadoop2:2181,hadoop3:2181",SimpleDemo.SESSION_TIMEOUT,this.wh); }
private void ZKOperations() throws KeeperException, InterruptedException { System.out.println("/n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent"); zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("/n2.查看是否创建成功:"); System.out.println(new String(zk.getData("/zoo2", false, null))); System.out.println("/n3.修改节点数据"); zk.setData("/zoo2", "toto".getBytes(), -1); System.out.println("/n4.查看是否修改成功:"); System.out.println(new String(zk.getData("/zoo2", false, null))); System.out.println("/n5.删除节点"); zk.delete("/zoo2", -1); System.out.println("/n6.查看节点是否被删除:"); System.out.println("节点状态:[" + zk.exists("/zoo2", false) + "]"); }
private void ZKClose() throws InterruptedException { zk.close(); }
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { SimpleDemo dm = new SimpleDemo(); dm.createZKInstance(); dm.ZKOperations(); dm.ZKClose(); } } |
运行结果: /n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent 一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection 信息: Socket connection established to hadoop3/192.168.106.82:2181, initiating session 一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread onConnected 信息: Session establishment complete on server hadoop3/192.168.106.82:2181, sessionid = 0x35983c177d00007, negotiated timeout = 30000 WatchedEvent state:SyncConnected type:None path:null /n2.查看是否创建成功: myData2 /n3.修改节点数据 /n4.查看是否修改成功: toto /n5.删除节点 /n6.查看节点是否被删除: 节点状态:[null] |
9.3 监听znode
Zookeeper的监听器工作机制
监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑
监听器的注册是在获取数据的操作中实现:
getData(path,watch?)监听的事件是:节点数据变化事件
getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件
9.4其它案例
所需jar包:
图1 项目包结构
package cn.com.toto.zk;
import java.util.List; import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test;
public class SimpleZkClient {
private static final String connectString = "192.168.106.80:2181,192.168.106.81:2181,192.168.106.82:2181"; private static final int sessionTimeout = 2000;
// latch就相当于一个对象锁,当latch.await()方法执行时,方法所在的线程会等待 //当latch的count减为0时,将会唤醒等待的线程 CountDownLatch latch = new CountDownLatch(1); ZooKeeper zkClient = null;
@Before public void init() throws Exception { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//事件监听回调方法 @Override public void process(WatchedEvent event) { if (latch.getCount() > 0 && event.getState() == KeeperState.SyncConnected) { System.out.println("countdown"); latch.countDown(); }
//收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) System.out.println(event.getType() + "---" + event.getPath()); System.out.println(event.getState()); } }); latch.await(); }
//创建数据节点到zk中 @Test public void testCreate() throws KeeperException, InterruptedException { //参数1:要创建的节点的路径 参数2:节点大数据参数3:节点的权限 参数4:节点的类型 String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //上传的数据可以是任何类型,但都要转成byte zkClient.close(); }
//判断znode是否存在 @Test public void testExist() throws KeeperException, InterruptedException { Stat stat = zkClient.exists("/eclipse", false); System.out.println(stat == null ? "not exist" : "exist"); }
//获取znode下的孩子节点 @Test public void getChildren() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/", true); for(String child : children) { System.out.println(child); } Thread.sleep(Long.MAX_VALUE); }
//获取参数 @Test public void getData() throws KeeperException, InterruptedException { byte[] data = zkClient.getData("/eclipse", true, null); System.out.println(new String(data)); Thread.sleep(Long.MAX_VALUE); }
//删除znode @Test public void deleteZnode() throws InterruptedException, KeeperException { //参数2:指定要删除的版本,-1表示删除所有版本 zkClient.delete("/eclipse", -1); }
//设置参数 @Test public void setData() throws Exception { //要注意,这里的/zookeeper 要在zookeeper中的节点中有 zkClient.setData("/zookeeper", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/zookeeper", false, null); System.out.println(new String(data)); } } |
案例二
package cn.com.toto.zk;
import java.util.List; import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;
import com.sun.org.apache.bcel.internal.generic.NEW;
public class TestZKclient { static ZooKeeper zk = null;
public static void main(String[] args) throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); zk = new ZooKeeper("hadoop:2181",2000,new Watcher() {
@Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { countDownLatch.countDown(); } System.out.println(event.getPath()); System.out.println(event.getType()); try { zk.getChildren("/zookeeper", true); } catch (Exception e) { e.printStackTrace(); } } });
countDownLatch.await();
/** zk.create("/myboys", "丑陋型".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.close(); **/
/** byte[] data = zk.getData("/myboys", true, null); System.out.println(new String(data,"UTF-8")); Thread.sleep(Long.MAX_VALUE); **/
/** zk.create("/myboys/wangkai", "测试型".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.close(); **/
/** List<String> children = zk.getChildren("/myboys", true); for(String child : children) { System.out.println(child); } **/
/**zk.delete("/myboys/wangkai", -1);**/
/**zk.setData("/myboys", "fasdfasdf".getBytes(), -1);**/ /** byte[] data = zk.getData("/myboys", true, null); System.out.println(new String(data,"UTF-8")); **/
Stat stat = zk.exists("/mywives", true); System.out.println(stat == null ? "确实不存在" : "存在"); zk.close(); } } |
9.5 其它网络参考资料