【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)https://developer.aliyun.com/article/1395303
删除节点源码分析
由于基本的CRUDE操作逻辑实现比较类似,这里主要介绍下deletingChildrenIfNeeded
是如何作用的,处理思路是在访问ZK出现NotEmptyException异常之后,这里在异常中判断是否设置删除子节点的操作并且重新发起请求。
catch ( KeeperException.NotEmptyException e ) { if ( deletingChildrenIfNeeded ) { ZKPaths.deleteChildren(client.getZooKeeper(), path, true); } else { throw e; } }
ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
这个工具方法具体操作是利用递归的方式遍历所有子ZNode,然后挨个执行delete
方法删除。
获取节点API
- CuratorFramework:
public GetDataBuilder getData();
- GetDataBuilder
- GetChildrenBuilder
public GetChildrenBuilder getChildren();
- PathAndBytesable< T>:
- `public T forPath(String path) throws Exception;
GetDataBuilder 的类继承结构图如下:
下面是简单的API使用:
client.create().creatingParentsIfNeeded().withMode(CreateMode.CONTAINER).forPath("/app2", "Test".getBytes()); //1、查询数据:get byte[] data = client.getData().forPath("/app2"); // KeeperErrorCode = NoNode for /app1 log.info("查询数据 {}", new String(data)); // 运行结果:查询数据 Test //2、查询子节点:ls List<String> list = client.getChildren().forPath("/app2"); log.info("查询子节点 {}", list); //运行结果:查询子节点 [] client.close(); //3、查询节点状态信息:ls -s Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app2");
获取节点源码分析
获取节点的代码如下:
public byte[] forPath(String path) throws Exception { client.getSchemaSet().getSchema(path).validateWatch(path, watching.isWatched() || watching.hasWatcher()); path = client.fixForNamespace(path); byte[] responseData = null; if ( backgrounding.inBackground() ) { client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); } else { responseData = pathInForeground(path); } return responseData; }
前台调用方法操作如下:
private byte[] pathInForeground(final String path) throws Exception { OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground"); byte[] responseData = RetryLoop.callWithRetry ( client.getZookeeperClient(), new Callable<byte[]>() { @Override public byte[] call() throws Exception { byte[] responseData; if ( watching.isWatched() ) { responseData = client.getZooKeeper().getData(path, true, responseStat); } else { responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat); watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false); } return responseData; } } ); trace.setResponseBytesLength(responseData).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(responseStat).commit(); return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData; }
修改节点API
- CuratorFramework:
public SetDataBuilder setData();
- SetDataBuilder
SetDataBuilder setData()
- PathAndBytesable< T>:
- `public T forPath(String path) throws Exception;
//1、修改节点数据(基本修改) curatorFramework.setData().forPath("/app1", "333".getBytes(StandardCharsets.UTF_8)); //2、根据版本号修改 Stat stat1 = new Stat(); curatorFramework.getData().storingStatIn(stat1).forPath("/app1"); curatorFramework.setData().withVersion(stat1.getVersion()).forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
修改节点源码分析
设计思路都是类似的,这里挑选forPath
的相关代码进行展示。
@Override public Stat forPath(String path, byte[] data) throws Exception { client.getSchemaSet().getSchema(path).validateGeneral(path, data, null); if ( compress ) { data = client.getCompressionProvider().compress(path, data); } path = client.fixForNamespace(path); Stat resultStat = null; if ( backgrounding.inBackground() ) { client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext(), null), null); } else { resultStat = pathInForeground(path, data); } return resultStat; }
监听节点
在使用原生的ZooKeeper的时候,是可以使用Watcher对节点进行监听的,但是唯一不方便的是一个Watcher只能生效一次,也就是说每次进行监听回调之后我们需要自己重新的设置监听才能达到永久监听的效果。
Curator在这方面做了优化,Curator引入了Cache的概念用来实现对ZooKeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是本地缓存视图和远程ZooKeeper视图的对比过程。而且Curator会自动再次监听,我们就不需要自己手动的重复监听了。
Curator支持的cache种类有3种Path Cache,Node Cache,Tree Cache。
1)Path Cache
Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
Path Cache是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。
2)Node Cache
Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
Node Cache是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。
3)Tree Cache
可以看做是上两种的合体,Tree Cache观察的是所有节点的所有数据。
Curator 拥有一套在节点上进行监听的API,具体操作是利用节点缓存上的监听器监听节点的数据变化。监听节点主要分为下面几个操作:
- 监听单个节点
- 监听所有子节点
- 监听节点树
监听单个节点API
监听单个节点的案例代码如下:
//----------------- 监听单个节点 ----------------------------------- //1. 创建NodeCache对象 final NodeCache nodeCache = new NodeCache(client,"/app1"); //2. 注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了~"); //获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据 nodeCache.start(true);
监听子节点API
//----------------- 监听子节点 ----------------------------------- //1.创建监听对象 PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true); //2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { System.out.println("子节点变化了~"); System.out.println(event); //监听子节点的数据变更,并且拿到变更后的数据 //1.获取类型 PathChildrenCacheEvent.Type type = event.getType(); //2.判断类型是否是update if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("数据变了!!!"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); //3. 开启 pathChildrenCache.start();
监听节点树
//----------------- 监听节点树 ----------------------------------- //1. 创建监听器 TreeCache treeCache = new TreeCache(client,"/app2"); //2. 注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) { System.out.println("节点变化了"); System.out.println(event); } }); //3. 开启 treeCache.start();
NodeCache 源码解析
有关节点监听机制,和ZK 的 watch 机制也有关,下面来简单解析 NodeCache 相关源码实现。
初始化部分
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) { this.client = client.newWatcherRemoveCuratorFramework(); this.path = PathUtils.validatePath(path); this.dataIsCompressed = dataIsCompressed; }
关键部分是构建了WatcherRemovalFacade
监听器的门面对象,在Cache 发生变化之后会触发事件监听回调通知。
return new WatcherRemovalFacade(this);
start() 启动
NodeCache 使用必须要结合 xxx.start();
方法。
public void start(boolean buildInitial) throws Exception { // 检查启动状态 Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); // 增加连接状态监听器 client.getConnectionStateListenable().addListener(connectionStateListener); // 初始化处理 if ( buildInitial ) { client.checkExists().creatingParentContainersIfNeeded().forPath(path); internalRebuild(); } reset(); }
我们需要注意client.getConnectionStateListenable().addListener(connectionStateListener);
这一串代码实际上是注册到 CuratorFrameworkImpl内部的连接状态管理器 ConnectionStateManager。
连接状态监听器的实现如下,主要是解决了原生客户端Watch只能使用一次的问题,这里通过监听状态变化并且结合CAS操作完成更新。
private ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) { if ( isConnected.compareAndSet(false, true) ) { try { reset(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Trying to reset after reconnection", e); } } } else { isConnected.set(false); } } };
上下两部分代码都调用了reset()
方法,它在内部传递了两个对象 监听对象watcher 以及 回调对象backgroundCallback(异步回调),前者在一开始启动就会注册进来,而后者则需要返回数据的时候执行回调函数。
// reset() 方法内部实现 private void reset() throws Exception { if ( (state.get() == State.STARTED) && isConnected.get() ) { client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } }
监听对象watcher 所干的事情就是不断重新执行reset
方法,把监听器重新注册到对应的节点上面。
private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { try { reset(); } catch(Exception e) { ThreadUtils.checkInterrupted(e); handleException(e); } } };
异步回调逻辑
异步回调的任务是判断当前事件是获取数据还是检查是否存在,之后进行本地缓存数据的变更,以及刷新本地缓存数据。
private final BackgroundCallback backgroundCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { processBackgroundResult(event); } }; private void processBackgroundResult(CuratorEvent event) throws Exception { // 当发生获取数据或者是判断节点是否存在时候进行监听 switch ( event.getType() ) { // 响应状态为ok时,将刷新本地缓存的数据 case GET_DATA: { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { // 获取监听到的数据变动集合 ChildData childData = new ChildData(path, event.getStat(), event.getData()); // 刷新本地缓存数据 setNewData(childData); } break; } case EXISTS: { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { setNewData(null); } else if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { if ( dataIsCompressed ) { client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } else { client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } } break; } } }
调用setNewData(childData); 之后会刷新本地缓存数据:
private void setNewData(ChildData newData) throws InterruptedException { // 比较最新数据和变更前的数据,查看是否有变更 ChildData previousData = data.getAndSet(newData); if ( !Objects.equal(previousData, newData) ) { listeners.forEach ( // 用节点监听容器内部的监听器处理目录变更事件 new Function<NodeCacheListener, Void>() { @Override public Void apply(NodeCacheListener listener) { try { listener.nodeChanged(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Calling listener", e); } return null; } } ); if ( rebuildTestExchanger != null ) { try { rebuildTestExchanger.exchange(new Object()); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } } } }
如何触发注册的监听器?
我们回到 start()
启动这部分代码,来看下如何触发监听器:
// 增加连接状态监听器 client.getConnectionStateListenable().addListener(connectionStateListener);
这里注册到 CuratorFrameworkImpl内部的连接状态管理器 ConnectionStateManager,具体的注册过程如下:
public Listenable<ConnectionStateListener> getListenable() { return listeners; }
这个 listeners
成员变量定义如下,可以看到它是一个监听器的管理容器:
private final UnaryListenerManager<ConnectionStateListener> listeners;
这个容器什么时候会通知注册在其中的监听器?
答案是在出现状态变更的时候:
org.apache.curator.framework.state.ConnectionStateManager#processEvents
listeners.forEach(listener -> listener.stateChanged(client, newState));
这部分内容在[[【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】]] 的通知机制中有详细介绍【参考:注册 ConnectionStateListener 通知部分】。
这里节省读者时间,我们直接看一个草图:
ConnectionStateManager
调用start
启动之后,会开启一个单线程线程池异步的轮询,并且在状态变更的时候回调UnaryListenerManager容器中注册的监听器。
以上就是关于如何触发注册的监听器的问题解答。
小结
节点监听缓存 NodeCache,内部关联Curator框架客户端CuratorFramework,通过节点内部的监听器容器 listeners(ListenerContainer)存放节点监听器。
添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer(CuratorFrameworkImpl内部的成员添加节点监听器,注册到节点缓存的节点监听器容器ListenerContainer)中。
启动节点监听器,注册节点监听器到CuratorFramework实现的连接状态管理器中ConnectionStateManager,如果需要则重新构建节点数据,同时重新注册节点监听器 CuratorWatcher,如果连接状态有变更, 重新注册节点监听器CuratorWatcher。
以上内容需要区分添加和启动过程,两者分别存储在两个不同的容器当中,这个添加过程类似先把鸡蛋放自己的篮子,启动之后再把自己篮子的鸡蛋倒入”机器“中运作。
当然上面的API没有分析PathChildrenCache,这里进行简单描述大致了解即可。
子目录监听器PathChildrenCache,主要成员变量为客户端框架实现CuratorFramework,子路径监听器容器 ListenerContainer(ListenerAble),及事件执行器CloseableExecutorService,事件操作集Set。
一级目录监听器PathChildrenCache,启动过程主要是注册连接状态监听器ConnectionStateListener,连接状态监听器根据连接状态来添加事件EventOperation和RefreshOperation操作到操作集。
事件操作EventOperation:主要是触发监听器的子目录事件操作;
事件刷新操作 RefreshOperation:主要是完成子目录的添加和刷新事件,并重新注册子目录监听器。 然后根据启动模式来决定是重添加事件操作,刷新、事件操作,或者重新构建,即刷新缓存路径数据,并注册刷新操作。
写在最后
这里还是吐槽 Curator 这代码设计挺绕的,还有很多贴合设计模式的古怪代码。
上一篇
[【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】]
参考资料
(3条消息) Curator之创建节点_curator创建节点_孤芳不自賞的博客-CSDN博客