【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(2)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】

【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(1)https://developer.aliyun.com/article/1395297

operationAndData.callPerformBackgroundOperation() 后台任务执行

operationAndData 继承了DelayQueue,运用多态特性拥有不同实现,内部只有一行代码:


void callPerformBackgroundOperation() throws Exception  
{  
    operation.performBackgroundOperation(this);  
}

operation.performBackgroundOperation(this);  对应 org.apache.curator.framework.imps.BackgroundOperation#performBackgroundOperation

image.png

BackgroundOperation 后台操作有很多具体的实现,对应了ZK常见操作。传递的this就是 operationAndData 对象。

image.png

会话管理

Client 连接过程的连接状态都是通过 ConnectionState 进行管理的,它会负责尝试超时重连的操作,ConnectionStateManager 会负责连接状态的改变和通知,ConnectionHandlingPolicy  则对应了连接超时策略的触发。

在前面的后台轮询队列操作指令对象过程中,也允许在超时时间内尝试重连,那么 Curator 是如何进行客户端 会话状态通知以及会话超时重连的?

连接事件监听和状态变更 org.apache.curator.ConnectionState#process

org.apache.curator.ConnectionState#process的代码可以得知,连接状态相关的事件类型为Watcher.Event.EventType.None,会通知到所有的Wathcer。

其中ConnectionState作为 defaultWatcher ,它的事件回调如下:


public void process(WatchedEvent event)  
{  
    if ( LOG_EVENTS )  
    {  
        log.debug("ConnectState watcher: " + event);  
    }  
    if ( event.getType() == Watcher.Event.EventType.None )  
    {  
    //isConnected:客户当前的连接状态,true表示已连接(SyncConnected 和 ConnectedReadOnly 状态)
        boolean wasConnected = isConnected.get(); 
        // 根据 org.apache.zookeeper.Watcher.Event.KeeperState 进行状态判断。 
        boolean newIsConnected = checkState(event.getState(), wasConnected);  
        if ( newIsConnected != wasConnected )  
        {  
          // /如果连接状态发生改变,则更新
            isConnected.set(newIsConnected);  
            connectionStartMs = System.currentTimeMillis();  
            if ( newIsConnected )  
            {  
      //重连,更新会话超时协商时间
      // NegotiatedSessionTimeoutMs(协商会话超时)。
                      lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());  
                log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());  
            }  
        }  
    }  
  // 通知parentWatchers, 注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
    for ( Watcher parentWatcher : parentWatchers )  
    {  
        OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());  
        parentWatcher.process(event);  
        trace.commit();  
    }  
}

最后一段注释提到可以看到遍历parentWatchers并且调用process方法。这里实际上默认会有个Watcher,那就是在初始化的时候默认会注册一个Watch作为parentWatcher传入。


this.client = new CuratorZookeeperClient  
        (  
            localZookeeperFactory,  
            builder.getEnsembleProvider(),  
            builder.getSessionTimeoutMs(),  
            builder.getConnectionTimeoutMs(),  
            builder.getWaitForShutdownTimeoutMs(),  
            new Watcher()  
            {  
                @Override  
                public void process(WatchedEvent watchedEvent)  
                {  
                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);  
                    // 注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
                    processEvent(event);  
                }  
            },  
            builder.getRetryPolicy(),  
            builder.canBeReadOnly(),  
            builder.getConnectionHandlingPolicy()  
        );

image.png

这部分通知事件回调在下文会再次提到,这里简单有关印象即可。

连接状态检查和处理 org.apache.curator.ConnectionState#checkState

连接状态检查和处理在org.apache.curator.ConnectionState#checkState方法中进行。


boolean newIsConnected = checkState(event.getState(), wasConnected);


private boolean checkState(Event.KeeperState state, boolean wasConnected)  
{  
    boolean isConnected = wasConnected;  
    boolean checkNewConnectionString = true;  
    switch ( state )  
    {  
    default:  
    case Disconnected:  
    {  
        isConnected = false;  
        break;    }  
    case SyncConnected:  
    case ConnectedReadOnly:  
    {  
        isConnected = true;  
        break;    }  
  // 访问权限异常
    case AuthFailed:  
    {  
        isConnected = false;  
        log.error("Authentication failed");  
        break;    }  
    case Expired:  
    {  
        isConnected = false;  
        checkNewConnectionString = false;  
        handleExpiredSession();  
        break;    }  
    case SaslAuthenticated:  
    {  
        // NOP  
        break;  
    }  
    }  
    // the session expired is logged in handleExpiredSession, so not log here  
    // 会话过期被记录在handleExpiredSession中,所以不记录在这里。 
    if (state != Event.KeeperState.Expired) {  
        new EventTrace(state.toString(), tracer.get(), getSessionId()).commit();  
    }  
    if ( checkNewConnectionString )  
    {  
      //如果服务端列表发生变化,则更新
        String newConnectionString = handleHolder.getNewConnectionString();  
        if ( newConnectionString != null )  
        {  
            handleNewConnectionString(newConnectionString);  
        }  
    }  
    return isConnected;  
}

上面根据不同连接状态判断连接是否异常, 返回结果为true则表示连接是正常的,当会话超时过期Expired时,会调用handleExpiredSession进行reset操作(会话被动重连),这里对于非连接超时的状态进行时间追踪。

注意重连策略 RetryPolicy这个策略在主动和被动重连中均会调用。

parentWatchers 注册和回调

发生状态变更的方法最后部分是通知所有的parentWatchers,下面来看看这个循环干了什么事情。

再次强调初始化的时候传入了一个 parentWatcher,会调用CuratorFrameworkImpl.processEvent 方法,现在来看看这部分是如何注册和回调的。


// 通知parentWatchers,注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
    for ( Watcher parentWatcher : parentWatchers )  
    {  
        OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());  
        parentWatcher.process(event);  
        trace.commit();  
    }

我们直接看看这个默认的Watcher回调CuratorFrameworkImpl#processEvent(event) 相关代码逻辑。


new Watcher()  
{  
    @Override  
    public void process(WatchedEvent watchedEvent)  
    {  
        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
        // 处理事件  
        processEvent(event);  
    }  
},

processEvent(event)相关逻辑如下,首先对于状态变更判断,状态如果出现变更则通知到所有注册在 CuratorListener 上的监听器。


private void processEvent(final CuratorEvent curatorEvent)  
{  
    if ( curatorEvent.getType() == CuratorEventType.WATCHED )  
    {  
      //状态转换
        validateConnection(curatorEvent.getWatchedEvent().getState());  
    }  
    //通知所有注册的CuratorListener
    listeners.forEach(new Function<CuratorListener, Void>()  
    {  
        @Override  
        public Void apply(CuratorListener listener)  
        {  
            try  
            {  
                OperationTrace trace = client.startAdvancedTracer("EventListener");  
                // 接收回调事件
                listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);  
                trace.commit();  
            }  
            catch ( Exception e )  
            {  
                ThreadUtils.checkInterrupted(e);  
                logError("Event listener threw exception", e);  
            }  
            return null;  
        }  
    });  
}

其中validateConnection 负责连接状态的转换代码。

org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection


void validateConnection(Watcher.Event.KeeperState state)  
{  
    if ( state == Watcher.Event.KeeperState.Disconnected )  
    {  
        internalConnectionHandler.suspendConnection(this);  
    }  
    else if ( state == Watcher.Event.KeeperState.Expired )  
    {  
        connectionStateManager.addStateChange(ConnectionState.LOST);  
    }  
    else if ( state == Watcher.Event.KeeperState.SyncConnected )  
    {  
        internalConnectionHandler.checkNewConnection(this);  
        connectionStateManager.addStateChange(ConnectionState.RECONNECTED);  
        unSleepBackgroundOperations();  
    }  
    else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )  
    {  
        internalConnectionHandler.checkNewConnection(this);  
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);  
    }  
}

可以看到实际的状态变更是依靠 ConnectionStateManager 组件负责的,ZK的原生客户端状态和Curator包装的状态对应表如下:

image.png

此外还需要注意每一个 if 判断的最后一行代码中有一个添加 ConnectionState 的操作,这个操作的意义是通知所有注册到 listenersConnectionStateListener


connectionStateManager.addStateChange(ConnectionState.READ_ONLY);

至于怎么通知的会在下文介绍。

通知机制

通知是干什么?其实就是在事件发生的时候,及时回调注册的Listenrner监听器对应的回调函数。Curator 针对不同组件设计了不同的监听器注册和回调。


// 自定义监听器 CuratorListener
client.getCuratorListenable().addListener((_fk, e) -> {
  if (e.getType().equals(CuratorEventType.WATCHED)) {
    log.info("测试");
  }
});
ConnectionStateListener connectionStateListener = (client1, newState) -> {
  //Some details
  log.info("newState => "+ newState);
};

可以注册的监听器方式如下:

  • 一次性 Watch 通知
  • 注册 CuratorListener 通知
  • 注册 ConnectionStateListener 通知
  • 注册 UnhandledErrorListener 通知
  • 后台线程操作完成时的回调通知
  • 缓存机制,多次注册

一次性 Watch 通知

每次都需要反复通过下面的方法重新注册。这里涉及到 NodeCache 的相关组件,由于目前并没有介绍相关的前置代码,这里暂时跳过介绍。


client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);

注册 CuratorListener 通知

实现方式很简单,就是把监听器注册到CuratorFrameworkImpl.listeners这个容器当中,后台线程完成操作通知该监听器容器的所有监听器。

比如异步的方式在ZK上面创建路径会触发CuratorEventType.CREATE事件,还有就是连接状态事件触发的时候parentWatcher也会回调这些listeners,比如下面的代码:


/**
 * connect ZK, register watchers
 */
public CuratorFramework mkClient(Map conf, List<String> servers, Object port,
                                 String root, final WatcherCallBack watcher) {
    CuratorFramework fk = Utils.newCurator(conf, servers, port, root);
  // 自定义监听器 CuratorListener
    fk.getCuratorListenable().addListener(new CuratorListener() {
        @Override
        public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception {
            if (e.getType().equals(CuratorEventType.WATCHED)) {
                WatchedEvent event = e.getWatchedEvent();
                watcher.execute(event.getState(), event.getType(), event.getPath());
            }
        }
    });
    fk.start();
    return fk;
}

org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent

processEvent 方法总会进行注册的 CuratorListener 回调操作。


private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }
        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    OperationTrace trace = client.startAdvancedTracer("EventListener");
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }

具体回调则是有各种执行构建实现器完成的,这一块深究比较复杂,这里有个概念后续有需要查看相关实现即可。

image.png

注册 ConnectionStateListener 通知

如果添加 ConnectionStateListener 监听器,则在连接状态发生改变时,会收到通知。


ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
          //Some details
        }
    };
client.getConnectionStateListenable().addListener(connectionStateListener);

ConnectionStateListener 监听器的事件回调发生在ConnectionStateManager当中,但是前面我们只介绍了如何初始化,下面扩展介绍回调ConnectionStateListener的部分

ConnectionStateManager 如何回调 ConnectionStateListener?

org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection

上面讲解会话机制的时候,提到了最后有一个添加 ConnectionState 的操作,这里将介绍收到 ConnectionState 变更之后如何回调注册在自己身上的监听器。


void validateConnection(Watcher.Event.KeeperState state)  
{  
  // ......
    else if ( state == Watcher.Event.KeeperState.Expired )  
    {  
        connectionStateManager.addStateChange(ConnectionState.LOST);  
    }  
    else if ( state == Watcher.Event.KeeperState.SyncConnected )  
    {  
        unSleepBackgroundOperations();  
    }  
    else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )  
    {  
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);  
    }  
}

具体处理在下面这个方法中完成。

org.apache.curator.framework.state.ConnectionStateManager#processEvents


private void processEvents()
    {
        while ( state.get() == State.STARTED )
        {
            try
            {
                int useSessionTimeoutMs = getUseSessionTimeoutMs();
                long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                long pollMaxMs = useSessionTimeoutMs - elapsedMs;
                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                if ( newState != null )
                {
                    if ( listeners.isEmpty() )
                    {
                        log.warn("There are no ConnectionStateListeners registered.");
                    }
          // 关键部分,当出现状态变更进行回调监听器通知
                    listeners.forEach(listener -> listener.stateChanged(client, newState));
                }
                else if ( sessionExpirationPercent > 0 )
                {
                    synchronized(this)
                    {
                        checkSessionExpiration();
                    }
                }
            }
            catch ( InterruptedException e )
            {
                // swallow the interrupt as it's only possible from either a background
                //  吞下中断,因为它只可能来自后台操作
                // operation and, thus, doesn't apply to this loop or the instance
                // is being closed in which case the while test will get it
                // 如果实例在关闭有可能走到这一块代码
            }
        }
    }

上面内容重要的其实就一行代码:


listeners.forEach(listener -> listener.stateChanged(client, newState));

这个processEvents是怎么回调的?其实在之前画的 CuratorFrameworkImpl 启动过程流程图中就有展示。

image.png

ConnectionStateManager 当中有一个 ExecutorService 线程池,翻看代码可以得知他的实现是 SingleThreadScheduledExecutor,这里含义明显就是单独开启一个线程轮询这一段代码检查 listener,状态变更通知注册在 ConnectionStateManager 上的监听器。

注册 UnhandledErrorListener 通知

同理注册到CuratorFrameworkImpl.unhandledErrorListeners当中,当后台线程操作发生异常或者handler发生异常的时候会触发。

注册方式


/**
 * connect ZK, register watchers
 */
public CuratorFramework mkClient(Map conf, List<String> servers, Object port,
                                 String root, final WatcherCallBack watcher) {
    CuratorFramework fk = Utils.newCurator(conf, servers, port, root);
  // 自定义监听器 UnhandledErrorListener
   fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
        @Override
        public void unhandledError(String msg, Throwable error) {
            String errmsg = "Unrecoverable zookeeper error, halting process: " + msg;
            LOG.error(errmsg, error);
            JStormUtils.halt_process(1, "Unrecoverable zookeeper error");
        }
    });
    fk.start();
    return fk;
}

如何触发?

触发的相关代码在org.apache.curator.framework.imps.CuratorFrameworkImpl#logError方法中,注意这里的apply方法处理。


void logError(String reason, final Throwable e)  
{  
  // 省略其他无关代码
    unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()  
    {  
        @Override  
        public Void apply(UnhandledErrorListener listener)  
        {  
            listener.unhandledError(localReason, e);  
            return null;        
            }  
    });  
  // 省略无关代码
}

后台线程操作完成时的回调通知

对于不同操作比如 setData,可以通过链式调用的方式传入回调函数 callback,操作完成之后会执行回调函数完成回调操作。


public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
        // this is another method of getting notification of an async completion
        client.setData().inBackground(callback).forPath(path, payload);
    }

缓存机制,多次注册

Curator的缓存机制是一块比较大的部头,Curator 的缓存方式包括:

  • Path Cache
  • Node Cache
  • Tree Cache

缓存在使用之前会和服务端的节点数据进行对比,当数据不一致时,会通过watch机制触发回调刷新本地缓存,同时再次注册Watch,每次重连会注册新的 Watcher,保证 Watcher永远不丢失。

小结

通过通知机制和会话管理两个部分,我们了解到:

  • 客户端通知是同步完成。
  • connectionStateManager.listeners是由内部的线程池做异步通知
  • CuratorFrameworkImpl.listeners 对于连接状态的通知,与watcher通知线程为同步,由后台线程通知时为异步
  • watcher注册过多可能导致重连之后watcher丢失。

写到最后

本节介绍了Curator的基础使用,从源码角度分析了Curator 组件的初始化过程,并且简单分析会话管理和通知机制的相关源码调用。

下面是本文涉及到的源码讲解汇总的一副总图。个人源码分析过程如果有存在错误或者疑问欢迎反馈和讨论。

image.png

最后是整个demo代码:


@Slf4j
public class CuratorTestExample {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy);
        // 连接ZK,开启连接
        // 自定义监听器 CuratorListener
        client.getCuratorListenable().addListener((_fk, e) -> {
            if (e.getType().equals(CuratorEventType.WATCHED)) {
                log.info("测试");
            }
        });
        ConnectionStateListener connectionStateListener = (client1, newState) -> {
            //Some details
            log.info("newState => "+ newState);
        };
        // 11:31:17.026 [Curator-ConnectionStateManager-0] INFO com.zxd.interview.zkcurator.CuratorTestExample - newState => CONNECTED
        client.getConnectionStateListenable().addListener(connectionStateListener);
        client.start();
        // 此处就获取到 zk的一个连接实例。
        //.....
        // 创建znode,如果有必要需要创建父目录
        client.create().creatingParentsIfNeeded().withProtection().forPath("/my/path", "Test".getBytes());
        InterProcessMutex lock = new InterProcessMutex(client, "/my/path");
        lock.acquire();
        try {
            // do some work inside of the critical section here
            Thread.sleep(1000);
        } finally {
            lock.release();
        }
    }
}

推荐阅读

ZK客户端Curator使用详解 - 知乎 (zhihu.com)

cloud.tencent.com/developer/a…

Curator目录监听 | Ravitn Blog (donaldhan.github.io)


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
7月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
110 0
|
4月前
|
Java API Maven
【zookeeper 第五篇章】Curator 库
Curator 是 Netflix 开源的 ZooKeeper 客户端框架,简化了原生 API 的使用并提供了高级功能。可通过 Maven 添加依赖 `curator-framework` 和 `curator-recipes`。示例代码展示了如何创建 Curator 连接、配置重连策略、进行节点的 CRUD 操作以及事务处理等。例如,使用 `ExponentialBackoffRetry` 实现指数退避重试,通过 `create()` 方法创建持久节点,以及利用 `inTransaction()` 启动事务来保证多个操作的原子性。
101 0
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
50 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
51 0
|
6月前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
55 1
|
6月前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
129 0
|
7月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
77 11
|
7月前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
7月前
|
Dubbo Java 应用服务中间件
微服务框架(七)Docker容器部署(Dubbo、Zookeeper、Dubbo-admin)
此系列文章将会描述Java框架**Spring Boot**、服务治理框架**Dubbo**、应用容器引擎**Docker**,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。   **本文为Docker容器部署,包括Dubbo微服务、Zookeeper、Dubbo-admin的部署**
微服务框架(七)Docker容器部署(Dubbo、Zookeeper、Dubbo-admin)
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2