概述
前面几篇系列博文我们熟悉了如何通过命令来操作ZK节点数据,下面我们来看下如何使用API来操作
主要两种方式
- 原生API
- Curator
今天我们来看下如何使用原生的API操作ZK
maven依赖
和 服务端的版本保持一致
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency>
验证
接下来我们使用单元测试来验证下原生API的对ZK 数据的增删改查
测试基类
我们来写下测试基类
package com.artisan.zk.originalClient; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @Slf4j public abstract class StandAloneBaseTest { private static final String ZK_ADDRESS = "192.168.126.131:2181"; private static final int SESSION_TIMEOUT = 30_000; private static CountDownLatch countDownLatch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() { return zooKeeper; } private static ZooKeeper zooKeeper ; private static Watcher watcher = event -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected && event.getType() == Watcher.Event.EventType.None){ log.info("ZK Connected"); countDownLatch.countDown(); } }; @Before public void init() throws IOException, InterruptedException { log.info("start to connect zk server: {}" , ZK_ADDRESS); zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, watcher); log.info("connecting to....{}", ZK_ADDRESS); countDownLatch.await(); } @After public void test(){ try { TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } }
为了方便测试,直接在init初始化方法中创建zookeeper实例 ,不要关闭~
ZK构造函数参数
connectString:ZooKeeper服务器列表
由英文逗号分开的host:port字符串组成,每一个都代表一台ZooKeeper机器,如
另外,也可以在connectString中设置客户端连接上ZooKeeper后的根目录,方法是在host:port字符串之后添加上这个根目录。例如,host1:port1,host2:port2,host3:port3/app/a,这样就指定了该客户端连接上ZooKeeper服务器之后,所有对ZooKeeper的操作,都会基于这个根目录。例如,客户端对/foo/bar 的操作,最终创建/app/a/foo/bar, 这个目录也叫Chroot,即客户端隔离命名空间。host1:port1,host2:port2,host3:port3
sessionTimeout:会话的超时时间, “毫秒”为单位
在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性.
一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。
watcher:事件通知处理器
ZooKeeper允许客户端在构造方法中传入一个接口 watcher (org.apache. zookeeper.Watcher
)的实现类对象来作为默认的 Watcher事件通知处理器。
当然,该参数可以设置为null 以表明不需要设置默认的 Watcher处理器。
canBeReadOnly: 用于标识当前会话是否支持“read-only(只读)”模式。
boolean类型的参数
默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半及以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请
求)。
但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供),这就是 ZooKeeper的“read-only”模式。
sessionId和 sessionPasswd:会话ID和会话秘钥
这两个参数能够唯一确定一个会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。
具体使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实例的以下两个接口,即可获得当前会话的ID和秘钥:
long getSessionId(); byte[]getSessionPasswd( );
荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传入构造方法了
CRUD
同步创建节点
package com.artisan.zk.originalClient; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Test; @Slf4j public class BaseOperationStandAloneModeTest extends StandAloneBaseTest{ private static final String NODE_NAME = "/artisan-node"; @Test public void testCreate(){ try{ ZooKeeper zooKeeper = getZooKeeper(); String s = zooKeeper.create(NODE_NAME,"artisan-node-value".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); log.info("create persistent node {} , result {}" , NODE_NAME, s ); }catch (Exception e){ log.error("create Exception {}", e.getMessage()); } } }
修改数据
@SneakyThrows @Test public void testSetData() { // 修改前数据 Stat stat = new Stat(); byte[] data = getZooKeeper().getData(NODE_NAME, null, stat); log.info("data before change: " + new String(data)); int version = stat.getVersion(); log.info("data version {} " , version); // 修改数据 Stat newStat = getZooKeeper().setData(NODE_NAME, "ARTISAN - NEW-SET-DATA".getBytes(), version); log.info("new stat version info {} " , newStat.getVersion()); log.info("data after change: {} " , new String(getZooKeeper().getData(NODE_NAME, null, newStat))); }
查询数据(不带watcher)
@SneakyThrows @Test public void testGetWithOutWatch(){ byte[] data = getZooKeeper().getData(NODE_NAME, null, null); log.info("data {}" , new String(data)); }
查询数据(带watcher)
@SneakyThrows @Test public void testGetWithWatch(){ Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { // 监听NodeDataChanged事件 if (event.getPath() != null && event.getPath().equals(NODE_NAME) && event.getType() == Watcher.Event.EventType.NodeDataChanged){ log.info("path {} changed watched " , NODE_NAME); // 监听一旦触发就会失效,因此需要重新监听 try { byte[] data = getZooKeeper().getData(NODE_NAME, this, null); log.info("监听触发后的操作-- data: {}",new String(data)); } catch (Exception e) { log.info("getData Error {} " , e.getMessage()); } } } }; // 获取节点数据 byte[] data = getZooKeeper().getData(NODE_NAME, watcher, null); log.info("data {}" , new String(data)); }
因为监听的是NodeDataChanged事件,因此我们再去调用修改数据的方法,或者在客户端手动修改数据
观察testGetWithWatch的日志
zk里查看数据
删除数据
@SneakyThrows @Test public void testDelete(){ // if the given version is -1, it matches any node's versions // -1 代表匹配所有版本,直接删除 // 任意大于 -1 的代表可以指定数据版本删除 getZooKeeper().delete(NODE_NAME,-1); }
查看客户端,已经删除
异步创建节点
@SneakyThrows @Test public void testCreateAsyn(){ getZooKeeper().create(NODE_NAME, "DATA_VALUE".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> { String currentThreadName = Thread.currentThread().getName(); log.info("currentThreadName {} , rc {} , path {} , ctx {} , name {} " , currentThreadName, rc , path ,ctx ,name ); }, "ARTISAN"); byte[] data = getZooKeeper().getData(NODE_NAME, null, null); log.info("data {}" , new String(data)); }
EventThread创建的节点 ,而非当前线程
行了 基本操作就这些,下篇继续