【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(1)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】

介绍

Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。和ZK的原生客户端相比,Curator的抽象层次要更高,同时简化了ZK的常用功能开发量,比如Curator自带连接重试、反复注册Watcher、NodeExistsException 异常处理等等。

根据官方的介绍,我们可以了解到它是一个用于分布式的Java客户端API工具。它基于high-level API,拥有它可以更简单易懂的指挥Zookeeper实现分布式安全应用程序开发。

Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-frameworkcurator-recipes,以及广为熟知的 分布式锁

Curator 当然也包括许多扩展,比如服务发现基于Java 8异步DSL


Apache Curator is a Java/JVM client library for [Apache ZooKeeper](https://zookeeper.apache.org/), a distributed coordination service.
Apache Curator includes a high-level API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.

用官方的介绍来说就是:guava之于java就像curator之于zookeeper

ZK 版本支持

Curator 目前最新的版本为 5.X 的版本,已经不支持 ZK 的 3.4.X 以及之前的版本,这里经过考虑最终选择了 ZK的 3.5.10 版本。

5.X 对于 Curator 做了不少破坏性的改动,不兼容的原因如下:

  • 旧的ListenerContainer类已经被移除,以避免Guava类泄漏。
  • ConnectionHandlingPolicy和相关类已被删除
  • Reaper和ChildReaper类/recipes已被删除。您应该改用 ZooKeeper 容器节点。
  • newPersistentEphemeralNode()和newPathChildrenCache()已从GroupMember中移除。
  • ServiceCacheBuilder< T> executorService(CloseableExecutorService executorService)已从ServiceCacheBuilder中移除。
  • ServiceProviderBuilder< T> executorService(CloseableExecutorService executorService)已从ServiceProviderBuilder中移除。
  • static boolean shouldRetry(int rc)已从RetryLoop中移除。
  • static boolean isRetryException(Throwable exception)已从RetryLoop中移除。

官网地址

Apache Curator

下载地址

Curator Maven 相关地址:mvnrepository.com/artifact/or…

Curator jar包下载地址:cwiki.apache.org/confluence/…

快速开始

ZK 集群部署

学习之前需要使用ZK搭建集群环境,方便Debug的时候调试代码。这部分搭建过程放到另一篇:

[【Zookeeper】基于3台linux虚拟机搭建zookeeper集群]

Maven依赖引入

下面是对应的Zookeeper和Curator的版本选择。


<curator.version>4.3.0</curator.version>  
<zookeeper.version>3.5.10</zookeeper.version>


<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>${curator.version}</version>
    <exclusions>
      <exclusion>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>${curator.version}</version>
    <exclusions>
      <exclusion>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>${zookeeper.version}</version>
  </dependency>

构建入门实例

Curator 最为核心和强大并且常用功能是分布式锁。在入门demo中可以看到整个 Curator 依靠 CuratorFrameworkFactory 构建,使用 Curator 进行分布式加锁解锁操作,只需要为所连接的每个ZooKeeper集群提供一个CuratorFramework对象。


CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)

上面的方法将会使用默认值创建与ZooKeeper集群的连接,唯一需要关注的是重试策略。


RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

根据参数值可以大致了解到,这里使用的策略是指数的方式递增间隔尝试重试时间,并且最终重试三次


RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.0.1;192.168.0.2;192.168.0.3", retryPolicy);  
client.start();  
// 此处就获取到 zk的一个连接实例。  
//.....

拥有了 CuratorFramework 实例之后,就可以直接通过 API 调用操作ZK。下面我们看一下重点以及使用最多的分布式锁的操作部分:


client.create().forPath("/my/path", myData)

这样的直接调用还有个好处是对于ZK的操作client实例如果碰到网络抖动等情况会自动重试。

可重入锁(公平锁)案例代码

下面是官网可重入锁的Demo使用代码。


InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

这里改造一下即可简单使用。


RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);  
client.start();  
// 此处就获取到 zk的一个连接实例。  
//.....  
client.create().forPath("/my/path", "Test".getBytes());  
InterProcessMutex lock = new InterProcessMutex(client, "/test/myLock");  
lock.acquire();  
try {  
    // do some work inside of the critical section here  
    Thread.sleep(3000);  
} finally {  
    lock.release();  
}

初始化过程流程图

初始化过程流程图全图如下。下面将会一步步拆解这幅图是如何拼凑的。

image.png

drawio 源文件和图片地址如下: 链接:pan.baidu.com/s/18PoMjkp1…提取码:4bug

初始化源码分析

直奔源码分析部分,本文主要介绍和Curator初始化、内部的通知机制以及会话管理部分。

CuratorFramework 初始化过程

初始化过程流程图

CuratorFramework 初始化过程下面截图这一部分,红色部分为个人认为相对比较重要的对象和变量。

image.png

CuratorFrameworkFactory.newClient() 代码分析

Curator 当中默认使用公平锁的策略去获取锁,多个客户端会按照排队的顺序挨个获取锁,下面我们通过代码进行验证。


RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy);

在获取分布式锁之前我们需要先连接ZK集群,整个过程通过两行代码完成,首先需要确定连接ZK的重试策略,接着通过CuratorFrameworkFactory构建Curator 实例即可,Curator 内部根据ZK原生客户端做了一层封装,开发者使用过程中不需要关注。


RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);

上面是简单的模板代码。ExponentialBackoffRetry 构建重试策略为按照指数增长重试时间,比如第一次1秒,第二次2秒,第三次4秒,第四次8秒.....

接着是利用CuratorFrameworkFactory构建实例。


return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);

这里强调一下两个常量 DEFAULT_SESSION_TIMEOUT_MS (默认的会话超时时间)、DEFAULT_CONNECTION_TIMEOUT_MS(默认的连接超时时间),作用是传入指定的重试策略默认参数。


private static final int DEFAULT_SESSION_TIMEOUT_MS
    = Integer.getInteger("curator-default-session-timeout", 60 * 1000)


private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);

我们进一步进入构造方法,这里用了建造者模式。


return builder().  
    connectString(connectString).  
    sessionTimeoutMs(sessionTimeoutMs).  
    connectionTimeoutMs(connectionTimeoutMs).  
    retryPolicy(retryPolicy).  
    build();

实际上调用的是CuratorFrameworkImpl实例。这里把CuratorFrameworkFactorythis引用逸出给CuratorFrameworkImpl对象。


return new CuratorFrameworkImpl(this);

CuratorFrameworkImpl 构造方法的内容比较多,这里主要说一下CuratorZookeeperClient这个对象,相当于ZK原生客户端的封装对象。

其他组件内容和Curator 的各种通知管理和会话管理等等功能有关。


public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)  
{  
    ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());  
    this.client = new CuratorZookeeperClient  
        (  
            localZookeeperFactory,  
            builder.getEnsembleProvider(),  
            builder.getSessionTimeoutMs(),  
            builder.getConnectionTimeoutMs(),  
            builder.getWaitForShutdownTimeoutMs(),  
            new Watcher()  
            {  
                @Override  
                public void process(WatchedEvent watchedEvent)  
                {  
                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);  
                    processEvent(event);  
                }  
            },  
            builder.getRetryPolicy(),  
            builder.canBeReadOnly(),  
            builder.getConnectionHandlingPolicy()  
        );  
  //用于判断连接断开和连接超时的状态,设置curator的连接状态,并通过connectionStateManager触发连接事件状态通知
    internalConnectionHandler = new StandardInternalConnectionHandler();
    //接收事件的通知。后台线程操作事件和连接状态事件会触发 
    listeners = new ListenerContainer<CuratorListener>();  
    //当后台线程发生异常或者handler发生异常的时候会触发
    unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();  
    //后台线程执行的操作队列
    backgroundOperations = new DelayQueue<OperationAndData<?>>();  
    forcedSleepOperations = new LinkedBlockingQueue<>();  
    //命名空间
    namespace = new NamespaceImpl(this, builder.getNamespace());  
//线程工厂方法,初始化后台线程池时会使用
    threadFactory = getThreadFactory(builder);  
maxCloseWaitMs = builder.getMaxCloseWaitMs();  
//负责连接状态变化时的通知
    connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());  
    compressionProvider = builder.getCompressionProvider();  
    aclProvider = builder.getAclProvider();  
    //CuratorFrameworkImpl的状态,调用start方法之前为 LATENT,调用start方法之后为 STARTED ,调用close()方法之后为STOPPEDstate = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);  
    useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); 
    //错误连接策略 
    connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");  
    schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");  
    zk34CompatibilityMode = builder.isZk34CompatibilityMode();  
    byte[] builderDefaultData = builder.getDefaultData();  
    defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];  
    authInfos = buildAuths(builder);  
  //有保障的执行删除操作,其实是不断尝试直到删除成功,通过递归调用实现
    failedDeleteManager = new FailedDeleteManager(this);  
    //有保障的执行删除watch操作
    failedRemoveWatcherManager = new FailedRemoveWatchManager(this);  
    namespaceFacadeCache = new NamespaceFacadeCache(this);  
  //服务端可用节点的检测器,第一次连接和重连成功之后都会触发重新获取服务端列表
    ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());  
    runSafeService = makeRunSafeService(builder);

newClient的目的是构建ZK连接实例,包括一系列附加核心组件:后台操作、连接事件、异常监控、容器,命名空间、负载均衡等等。

CuratorZookeeperClient 初始化过程

CuratorZookeeperClient 初始化过程流程图

CuratorZookeeperClient 初始化过程图如下:

image.png

CuratorZookeeperClient 初始化代码分析

上面提到,CuratorFrameworkImpl的初始化过程中有一段比较重要的CuratorZookeeperClient客户端初始化过程,下面就来看看这个CuratorZookeeperClient初始化过程干了啥。


public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
            int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
            RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
    // StandardConnectionHandler当收到Disconnect事件后,如果在规定时间内没有重连到服务器,则会主动触发Expired事件
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if ( sessionTimeoutMs < connectionTimeoutMs )
        {
            log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
        }
    // 重连策略
        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
        // //curator注册到原生客户端上的defaultWatcher,会收到和连接状态有关的事件通知等,负责超时重连
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
    //  重试策略设置
        setRetryPolicy(retryPolicy);
    }

ConnectionState是curator注册到原生客户端上的defaultWatcher,它会收到和连接状态有关的事件通知等,负责超时重连操作等。

下面看下ConnectionState的构造方法。


ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)  
{  
    this.ensembleProvider = ensembleProvider;  
    this.sessionTimeoutMs = sessionTimeoutMs;  
    this.connectionTimeoutMs = connectionTimeoutMs;  
    this.tracer = tracer;  
    this.connectionHandlingPolicy = connectionHandlingPolicy;  
    if ( parentWatcher != null )  
    {  
      // 因为defaultWatcher只能有一个,通过parentWatchers可实现defaultWatcher接到事件通知时parentWatchers的回调
        parentWatchers.offer(parentWatcher);  
    }  
    handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);  
}

parentWatchers 使用了并发安全队列 ConcurrentLinkedQueue,这个队列的作用可以如下:

ConcurrentLinkedQueue:一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。


private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();

ConnectionStateManager 初始化过程

ConnectionStateManager 初始化过程流程图

ConnectionStateManager 主要是持有Client引用,通过连接状态管理工程创建构建监听器,以及构建只允许一个线程执行的线程池。

Curator 的设计记录是一个客户端永远只有一个线程负责工作。

image.png

ConnectionStateManager 初始化代码分析

在Curator框架初始化代码中包含了 ConnectionStateManager 初始化,它主要负责状态维护和连接状态变更通知。


//负责连接状态变化时的通知
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());

从初始化代码可以看到,如果要监听状态改变,需要注册一个监听器。相关的注册方式在“通知管理”部分进行介绍,这里我们来看下相关的成员变量以及初始化方法。


//连接状态事件通知队列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
//需要通知的listeners 
private final UnaryListenerManager<ConnectionStateListener> listeners;
//ConnectionStateManager的运行状态 
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);

org.apache.curator.framework.state.ConnectionStateManager#ConnectionStateManager(org.apache.curator.framework.CuratorFramework, java.util.concurrent.ThreadFactory, int, int, org.apache.curator.framework.state.ConnectionStateListenerManagerFactory)


/**
Params:
client – the client 
threadFactory – thread factory to use or null for a default 
sessionTimeoutMs – the ZK session timeout in milliseconds 
sessionExpirationPercent – percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all 
managerFactory – manager factory to use
*/
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerManagerFactory managerFactory)  
{  
    this.client = client;  
    this.sessionTimeoutMs = sessionTimeoutMs;  
    this.sessionExpirationPercent = sessionExpirationPercent;  
    if ( threadFactory == null )  
    {  
        threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");  
    }  
    //事件队列处理线程池
    service = Executors.newSingleThreadExecutor(threadFactory);  
    // 构建监听器队列
    listeners = managerFactory.newManager(client);  
}

CuratorFrameworkImpl 启动过程

启动过程的主要工作是启动 ConnectionStateManager,同时负责连接事件的通知准备,接着是启动 CuratorZookeeperClient ,建立服务端会话连接等操作,最后通过开启一个单独的线程监听执行后台任务队列,这个线程的工作是不断从任务队列取出元素并且执行。

CuratorFrameworkImpl 启动过程流程图

image.png

客户端连接 client.start();

调用start 方法的代码如下:


client.start();

通过下面CAS操作将当前状态更新为 STARTED,同时根据if逻辑可以得知start()方法不允许重复调用,这和 JDK的 Thread 设计思路比较相似,Thread 同样只允许执行一次start()方法。

CAS 操作成功则构建连接监听器监听异常连接状态,监听器中判断当前客户端是否已经连接或者正在重连,如果是则logAsErrorConnectionErrors=true

client.start(); 内部逻辑如下,这个方法的代码都比较简单,具体可以参考注释理解。


public void start()  
{  
    log.info("Starting");  
    // 使用CAS把当前的运行状态切换为 STARTED,状态切换之后不可逆
    // LATENT:CuratorFramework.start() has not yet been called
    // STARTED: CuratorFramework.start() has been called
    if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )  
    {  
        throw new IllegalStateException("Cannot be started more than once");  
    }  
    try  
    {  
      // ordering dependency - must be called before client.start()  
      // 顺序依赖 - 必须在 client.start()之前调用。 
        connectionStateManager.start(); 
    // 构建连接监听器,监听异常连接状态
        final ConnectionStateListener listener = new ConnectionStateListener()  
        {  
            @Override  
            public void stateChanged(CuratorFramework client, ConnectionState newState)  
            {  
              // CONNECTED:为第一次成功连接到服务器而发送。注意:对于任何一个CuratorFramework实例只会收到其中一条信息。
              // RECONNECTED:一个暂停的、丢失的或只读的连接已被重新建立
              // RECONNECTED:A suspended, lost, or read-only connection has been re-established
              // 如果已经连接或者正在重连
                if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )  
                {  
                    logAsErrorConnectionErrors.set(true);  
                }  
            }  
            @Override  
            public boolean doNotDecorate()  
            {  
                return true;  
            }  
        };  
    // 注册监听器
        this.getConnectionStateListenable().addListener(listener);  
    // 全局启动开发设置为true,ConnectionState 状态更新
        client.start();  
    // 构建线程池
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);         // 执行具备返回值的Callable 任务
        executorService.submit(new Callable<Object>()  
        {  
            @Override  
            public Object call() throws Exception  
            {  
              // 关键部分:挂起后台操作
                backgroundOperationsLoop();  
                return null;            
      }  
        });  
        if ( ensembleTracker != null )  
        {  
            ensembleTracker.start();  
        }  
        log.info(schemaSet.toDocumentation());  
    }  
    catch ( Exception e )  
    {  
        ThreadUtils.checkInterrupted(e);  
        handleBackgroundOperationException(null, e);  
    }  
}

我们继续看关键部分backgroundOperationsLoop();

后台轮询操作指令 backgroundOperationsLoop()

这里再介绍backgroundOperationsLoop()方法,根据名称得知这是一个后台循环,后台任务的整体流程如下:


private void backgroundOperationsLoop()  
{  
    try  
    {  
        while ( state.get() == CuratorFrameworkState.STARTED )  
        {  
            OperationAndData<?> operationAndData;  
            try            
            {  
                operationAndData = backgroundOperations.take();  
                if ( debugListener != null )  
                {  
                    debugListener.listen(operationAndData);  
                }  
                // 执行后台操作
                performBackgroundOperation(operationAndData);  
            }  
            catch ( InterruptedException e )  
            {  
              // 在这里中断异常会被吞掉。
                // swallow the interrupt as it's only possible from either a background  
                // operation and, thus, doesn't apply to this loop or the instance                // is being closed in which case the while test will get it            }  
        }  
    }  
    finally  
    {  
        log.info("backgroundOperationsLoop exiting");  
    }  
}

OperationAndData 实现了 Delayed 接口用于实现阻塞队列延迟重试。

上面的处理逻辑如下:

  1. 判断当前是否为STARTED状态,一直循环。
  2. 从阻塞队列BlockingQueue当中弹出操作指令对象,在初始化代码中可以得知是一个DelayQueue,延迟并发安全阻塞队列,OperationAndData 对象毫无疑问实现了Delayed接口。


backgroundOperations = new DelayQueue<OperationAndData<?>>();
  1. 判断Debug 监听器是否存在,如果存在则监听OperationAndData
  2. 执行后台操作performBackgroundOperation,他的工作是从阻塞队列不断获取数据操作OperationAndData 对象调用callPerformBackgroundOperation方法,
  3. 如果无法正常连接ZK集群,此时会else并且进入到重连判断逻辑,如果符合条件,则添加到阻塞队列的当中等待下一次重试。(注意这里是主动重试,同步操作)


void performBackgroundOperation(OperationAndData<?> operationAndData)
    {
        try
        {
            if ( !operationAndData.isConnectionRequired() || client.isConnected() )
            {
                operationAndData.callPerformBackgroundOperation();
            }
            else
            {
              // 允许重连或者超时这样的情况发生
                client.getZooKeeper();  // important - allow connection resets, timeouts, etc. to occur
        // 如果连接超时,则跑出 CuratorConnectionLossException 异常
                if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
                {
                    throw new CuratorConnectionLossException();
                }
                // 如果没有超时,则推入到 forcedSleepOperations 强制睡眠后等待重连
                sleepAndQueueOperation(operationAndData);
            }
        }
        catch ( Throwable e )
        {
          // 检查线程中断
            ThreadUtils.checkInterrupted(e);
            /**
             * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
             * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
             * and callbacks need to get invoked, etc.
             */
             /*
             修复报告为CURATOR-52的边缘案例。当初始(或之前失败的)连接无法重新建立时,ConnectionState.checkTimeouts()会抛出KeeperException.ConnectionLossException。这需要通过重试策略运行,回调需要被调用,等等。
             */
             // 连接丢失异常处理
            if ( e instanceof CuratorConnectionLossException )
            {
                WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
                CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
                // 如果重连次数
                if ( checkBackgroundRetry(operationAndData, event) )
                {
                  // 推送到backgroundOperations队列尝试重连
                    queueOperation(operationAndData);
                }
                else
                {
                  // 放弃重连
                    logError("Background retry gave up", e);
                }
            }
            else
            {
              // 否则需要处理后台操作异常
                handleBackgroundOperationException(operationAndData, e);
            }
        }
    }

这里顺带介绍下后台决定是否重试的判断逻辑,主要是根据用户传输的重试策略执行对应的重试逻辑判断,是非常经典的策略模式实现。


client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData)

【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(2)https://developer.aliyun.com/article/1395298

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
7月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
110 0
|
4月前
|
Java API Maven
【zookeeper 第五篇章】Curator 库
Curator 是 Netflix 开源的 ZooKeeper 客户端框架,简化了原生 API 的使用并提供了高级功能。可通过 Maven 添加依赖 `curator-framework` 和 `curator-recipes`。示例代码展示了如何创建 Curator 连接、配置重连策略、进行节点的 CRUD 操作以及事务处理等。例如,使用 `ExponentialBackoffRetry` 实现指数退避重试,通过 `create()` 方法创建持久节点,以及利用 `inTransaction()` 启动事务来保证多个操作的原子性。
101 0
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
50 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
51 0
|
6月前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
55 1
|
6月前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
129 0
|
7月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
77 11
|
7月前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
7月前
|
Dubbo Java 应用服务中间件
微服务框架(七)Docker容器部署(Dubbo、Zookeeper、Dubbo-admin)
此系列文章将会描述Java框架**Spring Boot**、服务治理框架**Dubbo**、应用容器引擎**Docker**,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。   **本文为Docker容器部署,包括Dubbo微服务、Zookeeper、Dubbo-admin的部署**
微服务框架(七)Docker容器部署(Dubbo、Zookeeper、Dubbo-admin)
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2