【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】

引言

本文介绍后台任务延迟队列的“元素” 后台任务构造器 以及Curator 对于常见的ZK节点操作封装API。后台任务构造器对应了和ZK交互的常见”后台“操作,比如创建和销毁Watch,而ZK节点操作API涉及各种建造者模式的应用。可以说,Curator 整个框架各种地方都有建造者模式的身影。

Curator除了对于ZK本身交互和操作封装之外,还引入了Cache的概念来实现对ZooKeeper服务器端进行事件监听,本质上就是构建本地缓存,在远程节点出现”状态“变动的时候进行”联动“触发各种事件。

不过,Cache 的部分个人认为并不是很重要的内容,更多重心还是在分布式锁,再加上查询各种资料本身应用场景也比较少,因此放到了文章最后分析,读者可以按需阅读。

相关应用场景和重要概念

本文的源码分析涉及到 ZK 的应用场景和重要概念,这里先补充相关概念,为后面的源码分析铺垫。

相关应用场景

ZK 中可以完成数据发布订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举,分布式锁和分布式队列。

命名服务: 使用 ZooKeeper 的顺序节点生成全局唯一 ID。

数据发布/订阅:通过 Watcher 机制 可以很方便地实现数据发布/订阅。

分布式锁:通过创建唯一节点获得分布式锁,通常会使用临时节点的方式持有锁,特点是在节点宕机之后会自动释放。

重要概念

ZNode 概念

Zookeeper 的数据模型使用的是多叉树结构,每个节点上面可以存储任意类型的数据,比如数组、字符串、二进制序列。由于是树状节点,每个节点还可以有子节点。

树状节点的组成每个节点可以有N个子节点,根节点为 "/",每个数据节点在Zookeeper中都被叫做ZNode,整个树的组成有点类似Linux的文件系统结构。

image.png

注意 ZNode 通常用于临时创建,适合用于比较小体积的锁应用,不建议存储过大的业务数据,不要把过大的数据放到 ZNode上。

ZNode 数据节点

Zookeeper 的数据节点 ZNode 是最小组成单元,ZNode 是 ZK 实现分布式锁的重要基础,它主要有如下分类:

  • 持久(PERSISTENT)节点:一旦创建就会一直存在,直到 ZK集群宕机才会删除。
  • 临时(EPHEMERAL)节点:临时节点的生命周期是与 客户端会话(session) 绑定,会话消失则节点消失,
  • 持久顺序(PERSISTENT_SEQUENTIAL)节点:在持久节点的特性上,子节点的名称依然有顺序性,比如 /node1/app0000000001/node1/app0000000002
  • 临时顺序(EPHEMERAL_SEQUENTIAL)节点:除了具备临时(EPHEMERAL)节点的特性之外,子节点的名称还具有顺序性。

Watcher(事件监听器)

Watcher 事件监听器是 Zookeeper 当中非常重要的特性,ZK 允许用户在指定的 Znode 上面注册监听器 Watcher,特定的事件触发时候,ZK服务端会把事件通知到注册Watcher的客户端,。事件监听器也是分布式协调服务的重要组成部分。

在 Curator 中,Watcher 事件监听器是不同客户端监听分布式锁释放的重要应用组件。

ZK可视化客户端 PrettyZoo

为了方便我们调试源码的同时观察ZK节点变更,这里推荐使用 PrettyZoo 客户端。

PrettyZoo 是一个基于 Apache Curator 和 JavaFX 实现的 Zookeeper 图形化管理客户端。使用了 Java 的模块化(Jigsaw)技术,并基于 JPackage 打包了多平台的可运行文件(无需要额外安装 Java 运行时)。

目前已提供了 mac(dmg 文件)、Linux(deb 和 rpm 文件)、windows(msi 文件) 的安装包,下载地址

image.png

个人为Win系统,选择win.msi 的安装包,安装并启动并且就进入到主页面

image.png

完成配置之后进行连接,最终的连接效果如图:

image.png

前面的铺垫已经完成,下面正式进入主题。

后台任务构造器

[【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】]当中,我们介绍了Curator实例化、Zookeeper连接以及各种组件初始化和启动过程,其中就有一个后台执行操作队列不断执行后台操作。

image.png

OperationAndData中的 BackgroundOperation ,封装各种常见ZK指令的构造器。

image.png

下面以 BackgroundOperation 作为切入点,看看它的构造器是如何实现的?

后台操作接口 BackgroundOperation< T >

BackgroundOperation 是后台操作接口的 顶级接口,其中只有一个方法,它接收 OperationAndData 作为请求参数。

org.apache.curator.framework.imps.BackgroundOperation


interface BackgroundOperation<T>  
{  
    public void performBackgroundOperation(OperationAndData<T> data) throws Exception;  
}

这种设计让我联想到 Executor 的设计(略显牵强),Runnable 是线程的执行操作分离抽象,与之对应的OperationAndData是对于后台操作的抽象。


public interface Executor {  
     void execute(Runnable command);  
}

后台事件数据对象 OperationAndData

org.apache.curator.framework.imps.OperationAndData

OperationAndData 对象的代码略多,这里拆分介绍,首先来看下继承结构, OperationAndData 最终被存储在后台线程执行的操作队列backgroundOperations,backgroundOperations使用JDK原生并发延迟队列DelayQueue作为基础。

按照 DelayQueue 的设计存储要求,内部元素必须实现Delayed接口以支持延迟操作,除此之外, OperationAndData 还实现了 RetrySleeper 接口,从英文名称也可以大致猜出它是 对重试政策的抽象化


class OperationAndData<T> implements Delayed, RetrySleeper


//后台线程执行的操作队列
backgroundOperations = new DelayQueue<OperationAndData<?>>();

下面来看下相关成员变量定义:


/**
初始化为0,每执行一次 reset() 重置,此计数器的值会+1
*/
private static final AtomicLong nextOrdinal = new AtomicLong();  
/**
BackgroundOperation 相关引用
*/
private final BackgroundOperation<T> operation;  
/**
后台操作的相关对象
*/
private final T data;  
/**
异步后台操作
*/
private final BackgroundCallback callback;  
/**
执行开始时间
*/
private final long startTimeMs = System.currentTimeMillis();  
/**
错误回调
*/
private final ErrorCallback<T> errorCallback;  
/**
重试次数
*/
private final AtomicInteger retryCount = new AtomicInteger(0);  
/**
休眠时间
*/
private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
/**
执行顺序
*/
private final AtomicLong ordinal = new AtomicLong();  
/**
上下文
*/
private final Object context;  
/**
需要连接
*/
private final boolean connectionRequired;

重要的方法如下,执行后台操作就是调用operation.performBackgroundOperation(this);  方法:


void callPerformBackgroundOperation() throws Exception  
{  
    operation.performBackgroundOperation(this);  
}

BackgroundOperation 的实现类非常多。这里举几个例子。

BackgroundSyncImpl

从单词意思来看,这个实现是负责后台同步的。


class BackgroundSyncImpl implements BackgroundOperation<String>
{
    private final CuratorFrameworkImpl client;
    private final Object context;
    BackgroundSyncImpl(CuratorFrameworkImpl client, Object context)
    {
        this.client = client;
        this.context = context;
    }
    @Override
    public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
    {
        final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl");
        final String data = operationAndData.getData();
        client.getZooKeeper().sync
        (
            data,
            new AsyncCallback.VoidCallback()
            {
                @Override
                public void processResult(int rc, String path, Object ctx)
                {
                    trace.setReturnCode(rc).setRequestBytesLength(data).commit();
                    CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null, null);
                    client.processBackgroundOperation(operationAndData, event);
                }
            },
            context
        );
    }
}

这里通过构造CuratorEventImpl实现类,把operationAndDataevent事件传给CuratorFrameworkImpl

RemoveWatchesBuilderImpl

RemoveWatchesBuilderImpl定义了删除Watcher监听器的后台操作,简单看下相关代码实现。


public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWatchesType, RemoveWatchesLocal, BackgroundOperation<String>, ErrorListenerPathable<Void>  
{
   private CuratorFrameworkImpl client;
    private Watcher watcher;
    private CuratorWatcher curatorWatcher;
    private WatcherType watcherType;
    private boolean guaranteed;
    private boolean local;
    private boolean quietly;    
    private Backgrounding backgrounding;
    @Override
    public void performBackgroundOperation(final OperationAndData<String> operationAndData)
            throws Exception
    {
        try
        {
            final TimeTrace   trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");
            AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()
            {
                @Override
                public void processResult(int rc, String path, Object ctx)
                {
                    trace.commit();
                    CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null, null);
                    client.processBackgroundOperation(operationAndData, event);
                }
            };
            ZooKeeper zkClient = client.getZooKeeper();
            // 命名空间 Watch 
            NamespaceWatcher namespaceWatcher = makeNamespaceWatcher(operationAndData.getData());
            if(namespaceWatcher == null)
            {
              // ZK 客户端移除 Watch 
                zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());
            }
            else
            {
                zkClient.removeWatches(operationAndData.getData(), namespaceWatcher, watcherType, local, callback, operationAndData.getContext());
            }
        }
        catch ( Throwable e )
        {
            backgrounding.checkError(e, null);
        }
    }
}

执行后台事件 CuratorFrameworkImpl#processBackgroundOperation

所有的后台任务操作都会回调Curator 实例CuratorFrameworkImplprocessBackgroundOperation方法,下面简单分析相关方法细节。

org.apache.curator.framework.imps.CuratorFrameworkImpl#processBackgroundOperation

主要的逻辑如下:

  1. 判断是否初次执行,初次执行会进行连接状态检查呵护后续的重试判断处理。
  2. 校验是否需要重试。
  3. 检查是否发送回调。
  4. 监听器事件回调通知(这里会进行事件通知回调)。


<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
  boolean isInitialExecution = (event == null);
  // 如果是初次执行
  if ( isInitialExecution )
  {
    // 初次执行会进行连接状态检查呵护后续的重试判断处理
    performBackgroundOperation(operationAndData);
    return;
  }
  boolean doQueueOperation = false;
  do                                                                                          
  {
    // 校验是否需要重试
    if ( RetryLoop.shouldRetry(event.getResultCode()) )
    {
      doQueueOperation = checkBackgroundRetry(operationAndData, event);
      break;
    }
    // 检查是否发送回调
    if ( operationAndData.getCallback() != null )
    {
      // 发送后台回调
      sendToBackgroundCallback(operationAndData, event);
      break;
    }
    // 监听器事件回调通知
    processEvent(event);
  }
  while ( false );
  if ( doQueueOperation )
  {
    queueOperation(operationAndData);
  }
}

真心看不懂 while ( false );  这的写法=-=。

以上简单分析了后台任务构造器以及如何执行,设计比较好懂,这里就不做过多分析了。

Curator 节点操作

创建节点API

这里以个人阅读的 Curator 4.3.0 版本为例,创建节点的 API 涉及下面几个组件:

  • CuratorFramework
  • public CreateBuilder create();
  • CreateBuilder
  • public ProtectACLCreateModePathAndBytesable createParentsIfNeeded();
  • CreateModable
  • public T withMode(CreateMode mode);
  • PathAndBytesable< T>
  • public T forPath(String path, byte[] data) throws Exception;
  • public T forPath(String path) throws Exception;

下面是几个常见的API使用Demo:

创建一个节点,初始内容为空


client.create().forPath(path);


// 创建一个节点,初始内容为空  
client.create().forPath("/tmp");

从结果可以看到,如果没有设置节点属性,那么Curator默认创建的是持久节点

创建一个节点,附带初始内容


// 创建一个节点,附带初始内容  
client.create().forPath("/tmp", "init".getBytes());;

和上面的区别就是在对应的节点写入内容,注意 Curator 使用了 Zookeeper 的原始API风格。


//  KeeperErrorCode = NodeExists for /tmp

由于上面已经创建过节点,这里创建节点出现报错,我们在Pretty客户端中执行删除节点操作。删除之后重新执行,"/tmp"节点被正确创建。

image.png

创建一个临时节点,初始内容为空


client.create().withMode(CreateMode.EPHEMERAL).forPath(path);

临时节点属于会话级别,我们在编写Demo代码的时候,如果没有手动 close 客户端,那么服务端会判断客户端会在会话超时之后自动释放临时节点

临时节点的好处是即使ZK集群宕机,也可以保证及时释放,防止锁长期占用,适合作为分布式锁设计使用。


// 创建一个临时节点,初始内容为空  
client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp");  
// 如果立即Close,那么临时节点会立即释放
client.close();

如果我们 close 客户端,那么临时节点的创建和销毁会立即触发,在 prettyZoo 看来就是“什么也没发送过”。

创建一个临时节点,并自动递归创建父节点

使用ZooKeeper的过程中,开发人员经常会碰到NoNodeException异常,其中一个可能的原因就是试图对一个不存在的父节点创建子节点。


// 试图对一个不存在的父节点创建子节点  
client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp/childNode");  
// 报错KeeperErrorCode = NoNode for /temp/childNode

在使用Curator之后,通过调用creatingParentsIfNeeded接口,Curator就能够自动递归创建所有需要的父节点:


lient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);

image.png

创建节点源码分析

节点API的涉及都比较简单,CreateBuilder 的继承结构图如下:

image.png

CreateBuilder 的对应实现类为 CreateBuilderImpl,我们通过一串API调用Demo来简单分析构建过程:


client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/temp/childNode", "init".getBytes());

org.apache.curator.framework.imps.CuratorFrameworkImpl#create

  1. 检查ZK的连接状态。
  2. new CreateBuilderImpl,,这里的 this 为 CuratorFrameworkImpl ,也就是client 客户端实例。


public CreateBuilder create()  
{  
  // 检查ZK的连接状态
    checkState();  
    // new构造器, this 为 CuratorFrameworkImpl 也就是client 客户端实例
    return new CreateBuilderImpl(this);  
}

org.apache.curator.framework.imps.CreateBuilderImpl#creatingParentsIfNeeded

下面代码的关键是createParentsIfNeeded = true; 这一行,其他代码可以忽略。


@Override  
public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded()  
{  
    createParentsIfNeeded = true;  
    return new ProtectACLCreateModeStatPathAndBytesable<String>()  
    {  
        //....
    };  
}

withMode(CreateMode.EPHEMERAL)

这一部分属于ZK的客户端提供的,CreateMode 一般用的比较多的是临时节点

具体使用这里不一一介绍,简单看下源码中的英文注释很容易理解不同模式的作用。

image.png

默认为 PERSISTENT 持久节点。

org.apache.curator.framework.imps.CreateBuilderImpl#forPath(java.lang.String, byte[])

forPath 对应了创建节点的最终操作,这里大致逻辑如下:

  1. 判断是否需要压缩。
  2. acl 权限检查。
  3. 判断是否执行回调。
  4. 核心:使用 ZooKeeper 的顺序节点生成全局唯一 ID。


@Override
public String forPath(final String givenPath, byte[] data) throws Exception
{
  // 判断是否需要压缩
  if ( compress )
  {
    data = client.getCompressionProvider().compress(givenPath, data);
  }
  final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
  List<ACL> aclList = acling.getAclList(adjustedPath);
  client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
  String returnPath = null;
  // 后台回调
  if ( backgrounding.inBackground() )
  {
    pathInBackground(adjustedPath, data, givenPath);
  }
  else
  {
    // forpath 会走这一段逻辑
    // 
    String path = protectedPathInForeground(adjustedPath, data, aclList);
    returnPath = client.unfixForNamespace(path);
  }
  return returnPath;
}

顺着String path = protectedPathInForeground(adjustedPath, data, aclList);这一段代码一路往下探,找到对应截图部分的代码:

org.apache.curator.framework.imps.CreateBuilderImpl#pathInForeground

image.png

这里使用RetryLoop.callWithRetry嵌套了一个 Callable操作,但是这个操作并没有做任何多线程操作,而是进行了result = proc.call();调用???不太理解这一段封装处理的含义,于是看了下JavaDoc解释:

在Zookeeper上执行操作的机制,可安全防止断开连接和 "可恢复 "错误。如果在操作过程中出现异常,RetryLoop将处理该异常,检查当前重试策略,并尝试重新连接或重新抛出异常。

典型用法如下:


RetryLoop retryLoop = client.newRetryLoop();
  while ( retryLoop.shouldContinue() )
  {
      try
      {
          // do your work
          ZooKeeper zk = client.getZooKeeper();    // it's important to re-get the ZK instance in case there was an error and the instance was re-created
          retryLoop.markComplete();
      }
      catch ( Exception e )
      {
          retryLoop.takeException(e);
      }
  }

说白了,它主要封装了类似下面这样的操作:


int count = 0;  
while (count < 3){  
  try{  
//                ZooKeeper zk = client.getZooKeeper();  
//                zk.create(final String path, byte data[], List<ACL > acl,  
//                        CreateMode createMode)  
    break;  
  }catch (Exception e){  
    count++;  
    continue;            
}  
}

结果就是把重试的重复代码做了一个封装,其中call()方法则是具体委托ZK的客户端进行节点的创建操作了,这里的ttl-1

image.png

截图对应代码如下:


createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);

此外,个人在阅读代码过程中,发现在进行path的字符串拼接操作的时候,这里有一个小小的  StringBuilder 优化。


// Avoid internal StringBuilder's buffer reallocation by specifying the max path length  
StringBuilder path = new StringBuilder(maxPathLength);

至此,创建节点的相关操作源码已经了解,下面我们来过一下删除的相关API操作和源码。

删除节点API

  • CuratorFramework
  • public CreateBuilder create();
  • DeleteBuilder
  • ChildrenDeletable
  • public BackgroundVersionable deletingChildrenIfNeeded();
  • PathAndBytesable< T>
  • public T forPath(String path, byte[] data) throws Exception;
  • `public T forPath(String path) throws Exception;

DeleteBuilder 的继承结构图如下:

image.png

删除节点的API较为简单,这里直接贴出相关的Demo代码。


// 先创建一个持久节点  
client.create().forPath("/create");  
// 删除节点  
client.delete().forPath("/create");  
// 如果节点不存在:KeeperErrorCode = NoNode for /create


// 先创建一个持久节点  
client.create().creatingParentsIfNeeded().forPath("/create/child0");  
client.create().creatingParentsIfNeeded().forPath("/create/child1");  
// 删除并且判断是否需要同时删除子节点,如果有子节点并且确定一并删除需要添加 
deletingChildrenIfNeededclient.delete().deletingChildrenIfNeeded().forPath("/create");

image.png

【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)https://developer.aliyun.com/article/1395304

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
6月前
|
存储 Java Linux
【Zookeeper】Introduction to Apache Curator
【Zookeeper】Introduction to Apache Curator
132 0
|
6月前
|
消息中间件 分布式计算 算法
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(上)
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析
544 0
|
6月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
102 0
|
3月前
|
Java API Maven
【zookeeper 第五篇章】Curator 库
Curator 是 Netflix 开源的 ZooKeeper 客户端框架,简化了原生 API 的使用并提供了高级功能。可通过 Maven 添加依赖 `curator-framework` 和 `curator-recipes`。示例代码展示了如何创建 Curator 连接、配置重连策略、进行节点的 CRUD 操作以及事务处理等。例如,使用 `ExponentialBackoffRetry` 实现指数退避重试,通过 `create()` 方法创建持久节点,以及利用 `inTransaction()` 启动事务来保证多个操作的原子性。
91 0
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
47 1
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
47 0
|
5月前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
52 1
|
5月前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
128 0
|
6月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
74 11
|
6月前
|
Dubbo Java 应用服务中间件
微服务框架(七)Docker容器部署(Dubbo、Zookeeper、Dubbo-admin)
此系列文章将会描述Java框架**Spring Boot**、服务治理框架**Dubbo**、应用容器引擎**Docker**,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。   **本文为Docker容器部署,包括Dubbo微服务、Zookeeper、Dubbo-admin的部署**
微服务框架(七)Docker容器部署(Dubbo、Zookeeper、Dubbo-admin)