【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】

【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)https://developer.aliyun.com/article/1395303

删除节点源码分析

由于基本的CRUDE操作逻辑实现比较类似,这里主要介绍下deletingChildrenIfNeeded是如何作用的,处理思路是在访问ZK出现NotEmptyException异常之后,这里在异常中判断是否设置删除子节点的操作并且重新发起请求。


catch ( KeeperException.NotEmptyException e )  
{  
    if ( deletingChildrenIfNeeded )  
    {  
        ZKPaths.deleteChildren(client.getZooKeeper(), path, true);  
    }  
    else  
    {  
        throw e;  
    }  
}

ZKPaths.deleteChildren(client.getZooKeeper(), path, true);  这个工具方法具体操作是利用递归的方式遍历所有子ZNode,然后挨个执行delete方法删除。

image.png

获取节点API

  • CuratorFramework
  • public GetDataBuilder getData();
  • GetDataBuilder
  • GetChildrenBuilder
  • public GetChildrenBuilder getChildren();
  • PathAndBytesable< T>
  • `public T forPath(String path) throws Exception;

GetDataBuilder 的类继承结构图如下:

image.png

下面是简单的API使用:


client.create().creatingParentsIfNeeded().withMode(CreateMode.CONTAINER).forPath("/app2", "Test".getBytes());
//1、查询数据:get
byte[] data = client.getData().forPath("/app2");
// KeeperErrorCode = NoNode for /app1
log.info("查询数据 {}", new String(data));
// 运行结果:查询数据 Test
//2、查询子节点:ls
List<String> list = client.getChildren().forPath("/app2");
log.info("查询子节点 {}", list);
//运行结果:查询子节点 []
client.close();
//3、查询节点状态信息:ls -s
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app2");

获取节点源码分析

获取节点的代码如下:


public byte[] forPath(String path) throws Exception
    {
        client.getSchemaSet().getSchema(path).validateWatch(path, watching.isWatched() || watching.hasWatcher());
        path = client.fixForNamespace(path);
        byte[]      responseData = null;
        if ( backgrounding.inBackground() )
        {
            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
        }
        else
        {
            responseData = pathInForeground(path);
        }
        return responseData;
    }

前台调用方法操作如下:


private byte[] pathInForeground(final String path) throws Exception
    {
        OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");
        byte[]      responseData = RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<byte[]>()
            {
                @Override
                public byte[] call() throws Exception
                {
                    byte[]      responseData;
                    if ( watching.isWatched() )
                    {
                        responseData = client.getZooKeeper().getData(path, true, responseStat);
                    }
                    else
                    {
                        responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat);
                        watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);
                    }
                    return responseData;
                }
            }
        );
        trace.setResponseBytesLength(responseData).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(responseStat).commit();
        return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
    }

修改节点API

  • CuratorFramework
  • public SetDataBuilder setData();
  • SetDataBuilder
  • SetDataBuilder setData()
  • PathAndBytesable< T>
  • `public T forPath(String path) throws Exception;


//1、修改节点数据(基本修改)
curatorFramework.setData().forPath("/app1", "333".getBytes(StandardCharsets.UTF_8));
//2、根据版本号修改
Stat stat1 = new Stat();
curatorFramework.getData().storingStatIn(stat1).forPath("/app1");
curatorFramework.setData().withVersion(stat1.getVersion()).forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));

image.png

修改节点源码分析

设计思路都是类似的,这里挑选forPath的相关代码进行展示。


@Override  
public Stat forPath(String path, byte[] data) throws Exception  
{  
    client.getSchemaSet().getSchema(path).validateGeneral(path, data, null);  
    if ( compress )  
    {  
        data = client.getCompressionProvider().compress(path, data);  
    }  
    path = client.fixForNamespace(path);  
    Stat        resultStat = null;  
    if ( backgrounding.inBackground()  )  
    {  
        client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext(), null), null);  
    }  
    else  
    {  
        resultStat = pathInForeground(path, data);  
    }  
    return resultStat;  
}

监听节点

在使用原生的ZooKeeper的时候,是可以使用Watcher对节点进行监听的,但是唯一不方便的是一个Watcher只能生效一次,也就是说每次进行监听回调之后我们需要自己重新的设置监听才能达到永久监听的效果。

Curator在这方面做了优化,Curator引入了Cache的概念用来实现对ZooKeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是本地缓存视图和远程ZooKeeper视图的对比过程。而且Curator会自动再次监听,我们就不需要自己手动的重复监听了。

Curator支持的cache种类有3种Path Cache,Node Cache,Tree Cache。

1)Path Cache

Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。

Path Cache是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。

2)Node Cache

Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。

Node Cache是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。

3)Tree Cache

可以看做是上两种的合体,Tree Cache观察的是所有节点的所有数据。

Curator 拥有一套在节点上进行监听的API,具体操作是利用节点缓存上的监听器监听节点的数据变化。监听节点主要分为下面几个操作:

  • 监听单个节点
  • 监听所有子节点
  • 监听节点树

监听单个节点API

监听单个节点的案例代码如下:


//----------------- 监听单个节点 -----------------------------------
//1. 创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client,"/app1");
//2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
        System.out.println("节点变化了~");
        //获取修改节点后的数据
        byte[] data = nodeCache.getCurrentData().getData();
        System.out.println(new String(data));
    }
});
//3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
nodeCache.start(true);

监听子节点API


//----------------- 监听子节点 -----------------------------------
//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)  {
  System.out.println("子节点变化了~");
  System.out.println(event);
  //监听子节点的数据变更,并且拿到变更后的数据
  //1.获取类型
  PathChildrenCacheEvent.Type type = event.getType();
  //2.判断类型是否是update
  if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    System.out.println("数据变了!!!");
    byte[] data = event.getData().getData();
    System.out.println(new String(data));
  }
}
});
//3. 开启
pathChildrenCache.start();

监听节点树


//----------------- 监听节点树 -----------------------------------
//1. 创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event) {
        System.out.println("节点变化了");
        System.out.println(event);
    }
});
//3. 开启
treeCache.start();

NodeCache 源码解析

有关节点监听机制,和ZK 的 watch 机制也有关,下面来简单解析 NodeCache 相关源码实现。

初始化部分


public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)  
{  
    this.client = client.newWatcherRemoveCuratorFramework();  
    this.path = PathUtils.validatePath(path);  
    this.dataIsCompressed = dataIsCompressed;  
}

关键部分是构建了WatcherRemovalFacade监听器的门面对象,在Cache 发生变化之后会触发事件监听回调通知。


return new WatcherRemovalFacade(this);

start() 启动

NodeCache 使用必须要结合 xxx.start(); 方法。


public void     start(boolean buildInitial) throws Exception  
{  
  // 检查启动状态
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");  
  // 增加连接状态监听器
    client.getConnectionStateListenable().addListener(connectionStateListener);  
  // 初始化处理
    if ( buildInitial )  
    {  
        client.checkExists().creatingParentContainersIfNeeded().forPath(path);  
        internalRebuild();  
    }  
    reset();  
}

我们需要注意client.getConnectionStateListenable().addListener(connectionStateListener);  这一串代码实际上是注册到 CuratorFrameworkImpl内部的连接状态管理器 ConnectionStateManager

连接状态监听器的实现如下,主要是解决了原生客户端Watch只能使用一次的问题,这里通过监听状态变化并且结合CAS操作完成更新。


private ConnectionStateListener connectionStateListener = new ConnectionStateListener()  
{  
    @Override  
    public void stateChanged(CuratorFramework client, ConnectionState newState)  
    {  
        if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )  
        {  
            if ( isConnected.compareAndSet(false, true) )  
            {  
                try  
                {  
                    reset();  
                }  
                catch ( Exception e )  
                {  
                    ThreadUtils.checkInterrupted(e);  
                    log.error("Trying to reset after reconnection", e);  
                }  
            }  
        }  
        else  
        {  
            isConnected.set(false);  
        }  
    }  
};

上下两部分代码都调用了reset()方法,它在内部传递了两个对象 监听对象watcher 以及 回调对象backgroundCallback(异步回调),前者在一开始启动就会注册进来,而后者则需要返回数据的时候执行回调函数。


// reset() 方法内部实现
private void  reset() throws Exception  
{  
    if ( (state.get() == State.STARTED) && isConnected.get() )  
    {  
              client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);  
    }  
}

监听对象watcher 所干的事情就是不断重新执行reset方法,把监听器重新注册到对应的节点上面。


private Watcher watcher = new Watcher()  
{  
    @Override  
    public void process(WatchedEvent event)  
    {  
        try  
        {  
            reset();  
        }  
        catch(Exception e)  
        {  
            ThreadUtils.checkInterrupted(e);  
            handleException(e);  
        }  
    }  
};

异步回调逻辑

异步回调的任务是判断当前事件是获取数据还是检查是否存在,之后进行本地缓存数据的变更,以及刷新本地缓存数据。


private final BackgroundCallback backgroundCallback = new BackgroundCallback()  
{  
    @Override  
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception  
    {  
        processBackgroundResult(event);  
    }  
};
private void processBackgroundResult(CuratorEvent event) throws Exception
    {
      // 当发生获取数据或者是判断节点是否存在时候进行监听
        switch ( event.getType() )
        {
         // 响应状态为ok时,将刷新本地缓存的数据
            case GET_DATA:
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                   // 获取监听到的数据变动集合
                    ChildData childData = new ChildData(path, event.getStat(), event.getData());
                  // 刷新本地缓存数据
                    setNewData(childData);
                }
                break;
            }
            case EXISTS:
            {
                if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                {
                    setNewData(null);
                }
                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    if ( dataIsCompressed )
                    {
                        client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    }
                    else
                    {
                        client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    }
                }
                break;
            }
        }
    }

调用setNewData(childData); 之后会刷新本地缓存数据:


private void setNewData(ChildData newData) throws InterruptedException  
{  
  // 比较最新数据和变更前的数据,查看是否有变更
    ChildData   previousData = data.getAndSet(newData);  
    if ( !Objects.equal(previousData, newData) )  
    {  
        listeners.forEach  
        (  
          // 用节点监听容器内部的监听器处理目录变更事件
            new Function<NodeCacheListener, Void>()  
            {  
                @Override  
                public Void apply(NodeCacheListener listener)  
                {  
                    try  
                    {  
                        listener.nodeChanged();  
                    }  
                    catch ( Exception e )  
                    {  
                        ThreadUtils.checkInterrupted(e);  
                        log.error("Calling listener", e);  
                    }  
                    return null;  
                }  
            }  
        );  
        if ( rebuildTestExchanger != null )  
        {  
            try  
            {  
                rebuildTestExchanger.exchange(new Object());  
            }  
            catch ( InterruptedException e )  
            {  
                Thread.currentThread().interrupt();  
            }  
        }  
    }  
}

如何触发注册的监听器?

我们回到 start() 启动这部分代码,来看下如何触发监听器:


// 增加连接状态监听器
client.getConnectionStateListenable().addListener(connectionStateListener);

这里注册到 CuratorFrameworkImpl内部的连接状态管理器 ConnectionStateManager,具体的注册过程如下:


public Listenable<ConnectionStateListener> getListenable()  
{  
    return listeners;  
}

这个 listeners 成员变量定义如下,可以看到它是一个监听器的管理容器:


private final UnaryListenerManager<ConnectionStateListener> listeners;

这个容器什么时候会通知注册在其中的监听器?

答案是在出现状态变更的时候:

org.apache.curator.framework.state.ConnectionStateManager#processEvents


listeners.forEach(listener -> listener.stateChanged(client, newState));

这部分内容在[[【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】]] 的通知机制中有详细介绍【参考:注册 ConnectionStateListener 通知部分】。

这里节省读者时间,我们直接看一个草图:

image.png

ConnectionStateManager 调用start 启动之后,会开启一个单线程线程池异步的轮询,并且在状态变更的时候回调UnaryListenerManager容器中注册的监听器。

以上就是关于如何触发注册的监听器的问题解答。

小结

节点监听缓存 NodeCache,内部关联Curator框架客户端CuratorFramework,通过节点内部的监听器容器 listeners(ListenerContainer)存放节点监听器。

添加节点监听器,实际上是注册到节点缓存的节点监听器容器ListenerContainer(CuratorFrameworkImpl内部的成员添加节点监听器,注册到节点缓存的节点监听器容器ListenerContainer)中。

启动节点监听器,注册节点监听器到CuratorFramework实现的连接状态管理器中ConnectionStateManager,如果需要则重新构建节点数据,同时重新注册节点监听器 CuratorWatcher,如果连接状态有变更, 重新注册节点监听器CuratorWatcher。

以上内容需要区分添加和启动过程,两者分别存储在两个不同的容器当中,这个添加过程类似先把鸡蛋放自己的篮子,启动之后再把自己篮子的鸡蛋倒入”机器“中运作。

当然上面的API没有分析PathChildrenCache,这里进行简单描述大致了解即可。

子目录监听器PathChildrenCache,主要成员变量为客户端框架实现CuratorFramework,子路径监听器容器 ListenerContainer(ListenerAble),及事件执行器CloseableExecutorService,事件操作集Set。

一级目录监听器PathChildrenCache,启动过程主要是注册连接状态监听器ConnectionStateListener,连接状态监听器根据连接状态来添加事件EventOperation和RefreshOperation操作到操作集。

事件操作EventOperation:主要是触发监听器的子目录事件操作;

事件刷新操作 RefreshOperation:主要是完成子目录的添加和刷新事件,并重新注册子目录监听器。 然后根据启动模式来决定是重添加事件操作,刷新、事件操作,或者重新构建,即刷新缓存路径数据,并注册刷新操作。

写在最后

这里还是吐槽 Curator 这代码设计挺绕的,还有很多贴合设计模式的古怪代码。

上一篇

[【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】]

参考资料

(3条消息) Curator之创建节点_curator创建节点_孤芳不自賞的博客-CSDN博客

ZooKeeper相关概念总结(入门) | JavaGuide(Java面试 + 学习指南)

www.jianshu.com/p/a864bf8a6…

donaldhan.github.io/zookeeper/2…

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4月前
|
消息中间件 分布式计算 算法
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(上)
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析
55 0
|
3月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
22 0
|
10天前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
25 11
|
11天前
|
存储 Java 网络安全
ZooKeeper【搭建 03】apache-zookeeper-3.6.0 伪集群版(一台服务器实现三个节点的ZooKeeper集群)
【4月更文挑战第10天】ZooKeeper【搭建 03】apache-zookeeper-3.6.0 伪集群版(一台服务器实现三个节点的ZooKeeper集群)
24 1
|
2月前
|
消息中间件 存储 Kafka
Kafka【环境搭建 02】kafka_2.11-2.4.1 基于 zookeeper 搭建高可用伪集群(一台服务器实现三个节点的 Kafka 集群)
【2月更文挑战第19天】Kafka【环境搭建 02】kafka_2.11-2.4.1 基于 zookeeper 搭建高可用伪集群(一台服务器实现三个节点的 Kafka 集群)
140 1
|
4月前
|
监控 算法 网络协议
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(下)
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析
35 1
|
4月前
|
分布式计算 数据管理 Java
Zookeeper(持续更新) VIP-01 Zookeeper特性与节点数据类型详解
官方文档上这么解释zookeeper,它是一个分布式协调框架,是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
Zookeeper(持续更新) VIP-01 Zookeeper特性与节点数据类型详解
|
18天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
32 2
|
4月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0
|
2月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
90 1