【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(1)https://developer.aliyun.com/article/1395297
operationAndData.callPerformBackgroundOperation() 后台任务执行
operationAndData 继承了DelayQueue,运用多态特性拥有不同实现,内部只有一行代码:
void callPerformBackgroundOperation() throws Exception { operation.performBackgroundOperation(this); }
operation.performBackgroundOperation(this); 对应 org.apache.curator.framework.imps.BackgroundOperation#performBackgroundOperation
BackgroundOperation 后台操作有很多具体的实现,对应了ZK常见操作。传递的this就是 operationAndData 对象。
会话管理
Client 连接过程的连接状态都是通过 ConnectionState 进行管理的,它会负责尝试超时重连的操作,ConnectionStateManager 会负责连接状态的改变和通知,ConnectionHandlingPolicy 则对应了连接超时策略的触发。
在前面的后台轮询队列操作指令对象过程中,也允许在超时时间内尝试重连,那么 Curator 是如何进行客户端 会话状态通知以及会话超时重连的?
连接事件监听和状态变更 org.apache.curator.ConnectionState#process
从org.apache.curator.ConnectionState#process
的代码可以得知,连接状态相关的事件类型为Watcher.Event.EventType.None
,会通知到所有的Wathcer。
其中ConnectionState
作为 defaultWatcher ,它的事件回调如下:
public void process(WatchedEvent event) { if ( LOG_EVENTS ) { log.debug("ConnectState watcher: " + event); } if ( event.getType() == Watcher.Event.EventType.None ) { //isConnected:客户当前的连接状态,true表示已连接(SyncConnected 和 ConnectedReadOnly 状态) boolean wasConnected = isConnected.get(); // 根据 org.apache.zookeeper.Watcher.Event.KeeperState 进行状态判断。 boolean newIsConnected = checkState(event.getState(), wasConnected); if ( newIsConnected != wasConnected ) { // /如果连接状态发生改变,则更新 isConnected.set(newIsConnected); connectionStartMs = System.currentTimeMillis(); if ( newIsConnected ) { //重连,更新会话超时协商时间 // NegotiatedSessionTimeoutMs(协商会话超时)。 lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs()); log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get()); } } } // 通知parentWatchers, 注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent for ( Watcher parentWatcher : parentWatchers ) { OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId()); parentWatcher.process(event); trace.commit(); } }
最后一段注释提到可以看到遍历parentWatchers
并且调用process
方法。这里实际上默认会有个Watcher,那就是在初始化的时候默认会注册一个Watch作为parentWatcher传入。
this.client = new CuratorZookeeperClient ( localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), builder.getWaitForShutdownTimeoutMs(), new Watcher() { @Override public void process(WatchedEvent watchedEvent) { CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); // 注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent processEvent(event); } }, builder.getRetryPolicy(), builder.canBeReadOnly(), builder.getConnectionHandlingPolicy() );
这部分通知事件回调在下文会再次提到,这里简单有关印象即可。
连接状态检查和处理 org.apache.curator.ConnectionState#checkState
连接状态检查和处理在org.apache.curator.ConnectionState#checkState
方法中进行。
boolean newIsConnected = checkState(event.getState(), wasConnected);
private boolean checkState(Event.KeeperState state, boolean wasConnected) { boolean isConnected = wasConnected; boolean checkNewConnectionString = true; switch ( state ) { default: case Disconnected: { isConnected = false; break; } case SyncConnected: case ConnectedReadOnly: { isConnected = true; break; } // 访问权限异常 case AuthFailed: { isConnected = false; log.error("Authentication failed"); break; } case Expired: { isConnected = false; checkNewConnectionString = false; handleExpiredSession(); break; } case SaslAuthenticated: { // NOP break; } } // the session expired is logged in handleExpiredSession, so not log here // 会话过期被记录在handleExpiredSession中,所以不记录在这里。 if (state != Event.KeeperState.Expired) { new EventTrace(state.toString(), tracer.get(), getSessionId()).commit(); } if ( checkNewConnectionString ) { //如果服务端列表发生变化,则更新 String newConnectionString = handleHolder.getNewConnectionString(); if ( newConnectionString != null ) { handleNewConnectionString(newConnectionString); } } return isConnected; }
上面根据不同连接状态判断连接是否异常, 返回结果为true则表示连接是正常的,当会话超时过期Expired
时,会调用handleExpiredSession
进行reset
操作(会话被动重连),这里对于非连接超时的状态进行时间追踪。
注意重连策略 RetryPolicy这个策略在主动和被动重连中均会调用。
parentWatchers 注册和回调
发生状态变更的方法最后部分是通知所有的parentWatchers,下面来看看这个循环干了什么事情。
再次强调初始化的时候传入了一个 parentWatcher,会调用CuratorFrameworkImpl.processEvent
方法,现在来看看这部分是如何注册和回调的。
// 通知parentWatchers,注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent for ( Watcher parentWatcher : parentWatchers ) { OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId()); parentWatcher.process(event); trace.commit(); }
我们直接看看这个默认的Watcher回调CuratorFrameworkImpl#processEvent(event)
相关代码逻辑。
new Watcher() { @Override public void process(WatchedEvent watchedEvent) { CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); // 处理事件 processEvent(event); } },
processEvent(event)
相关逻辑如下,首先对于状态变更判断,状态如果出现变更则通知到所有注册在 CuratorListener 上的监听器。
private void processEvent(final CuratorEvent curatorEvent) { if ( curatorEvent.getType() == CuratorEventType.WATCHED ) { //状态转换 validateConnection(curatorEvent.getWatchedEvent().getState()); } //通知所有注册的CuratorListener listeners.forEach(new Function<CuratorListener, Void>() { @Override public Void apply(CuratorListener listener) { try { OperationTrace trace = client.startAdvancedTracer("EventListener"); // 接收回调事件 listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent); trace.commit(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); logError("Event listener threw exception", e); } return null; } }); }
其中validateConnection
负责连接状态的转换代码。
org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection
void validateConnection(Watcher.Event.KeeperState state) { if ( state == Watcher.Event.KeeperState.Disconnected ) { internalConnectionHandler.suspendConnection(this); } else if ( state == Watcher.Event.KeeperState.Expired ) { connectionStateManager.addStateChange(ConnectionState.LOST); } else if ( state == Watcher.Event.KeeperState.SyncConnected ) { internalConnectionHandler.checkNewConnection(this); connectionStateManager.addStateChange(ConnectionState.RECONNECTED); unSleepBackgroundOperations(); } else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { internalConnectionHandler.checkNewConnection(this); connectionStateManager.addStateChange(ConnectionState.READ_ONLY); } }
可以看到实际的状态变更是依靠 ConnectionStateManager 组件负责的,ZK的原生客户端状态和Curator包装的状态对应表如下:
此外还需要注意每一个 if
判断的最后一行代码中有一个添加 ConnectionState 的操作,这个操作的意义是通知所有注册到 listeners
的ConnectionStateListener
。
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
至于怎么通知的会在下文介绍。
通知机制
通知是干什么?其实就是在事件发生的时候,及时回调注册的Listenrner监听器对应的回调函数。Curator 针对不同组件设计了不同的监听器注册和回调。
// 自定义监听器 CuratorListener client.getCuratorListenable().addListener((_fk, e) -> { if (e.getType().equals(CuratorEventType.WATCHED)) { log.info("测试"); } }); ConnectionStateListener connectionStateListener = (client1, newState) -> { //Some details log.info("newState => "+ newState); };
可以注册的监听器方式如下:
- 一次性 Watch 通知
- 注册 CuratorListener 通知
- 注册 ConnectionStateListener 通知
- 注册 UnhandledErrorListener 通知
- 后台线程操作完成时的回调通知
- 缓存机制,多次注册
一次性 Watch 通知
每次都需要反复通过下面的方法重新注册。这里涉及到 NodeCache 的相关组件,由于目前并没有介绍相关的前置代码,这里暂时跳过介绍。
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
注册 CuratorListener 通知
实现方式很简单,就是把监听器注册到CuratorFrameworkImpl.listeners
这个容器当中,后台线程完成操作通知该监听器容器的所有监听器。
比如异步的方式在ZK上面创建路径会触发CuratorEventType.CREATE事件,还有就是连接状态事件触发的时候parentWatcher也会回调这些listeners,比如下面的代码:
/** * connect ZK, register watchers */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); // 自定义监听器 CuratorListener fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.start(); return fk; }
org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent
processEvent
方法总会进行注册的 CuratorListener 回调操作。
private void processEvent(final CuratorEvent curatorEvent) { if ( curatorEvent.getType() == CuratorEventType.WATCHED ) { validateConnection(curatorEvent.getWatchedEvent().getState()); } listeners.forEach(new Function<CuratorListener, Void>() { @Override public Void apply(CuratorListener listener) { try { OperationTrace trace = client.startAdvancedTracer("EventListener"); listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent); trace.commit(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); logError("Event listener threw exception", e); } return null; } }); }
具体回调则是有各种执行构建实现器完成的,这一块深究比较复杂,这里有个概念后续有需要查看相关实现即可。
注册 ConnectionStateListener 通知
如果添加 ConnectionStateListener 监听器,则在连接状态发生改变时,会收到通知。
ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { //Some details } }; client.getConnectionStateListenable().addListener(connectionStateListener);
ConnectionStateListener 监听器的事件回调发生在ConnectionStateManager当中,但是前面我们只介绍了如何初始化,下面扩展介绍回调ConnectionStateListener
的部分
ConnectionStateManager 如何回调 ConnectionStateListener?
org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection
上面讲解会话机制的时候,提到了最后有一个添加 ConnectionState 的操作,这里将介绍收到 ConnectionState 变更之后如何回调注册在自己身上的监听器。
void validateConnection(Watcher.Event.KeeperState state) { // ...... else if ( state == Watcher.Event.KeeperState.Expired ) { connectionStateManager.addStateChange(ConnectionState.LOST); } else if ( state == Watcher.Event.KeeperState.SyncConnected ) { unSleepBackgroundOperations(); } else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { connectionStateManager.addStateChange(ConnectionState.READ_ONLY); } }
具体处理在下面这个方法中完成。
org.apache.curator.framework.state.ConnectionStateManager#processEvents
private void processEvents() { while ( state.get() == State.STARTED ) { try { int useSessionTimeoutMs = getUseSessionTimeoutMs(); long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch; long pollMaxMs = useSessionTimeoutMs - elapsedMs; final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS); if ( newState != null ) { if ( listeners.isEmpty() ) { log.warn("There are no ConnectionStateListeners registered."); } // 关键部分,当出现状态变更进行回调监听器通知 listeners.forEach(listener -> listener.stateChanged(client, newState)); } else if ( sessionExpirationPercent > 0 ) { synchronized(this) { checkSessionExpiration(); } } } catch ( InterruptedException e ) { // swallow the interrupt as it's only possible from either a background // 吞下中断,因为它只可能来自后台操作 // operation and, thus, doesn't apply to this loop or the instance // is being closed in which case the while test will get it // 如果实例在关闭有可能走到这一块代码 } } }
上面内容重要的其实就一行代码:
listeners.forEach(listener -> listener.stateChanged(client, newState));
这个processEvents是怎么回调的?其实在之前画的 CuratorFrameworkImpl 启动过程流程图中就有展示。
ConnectionStateManager 当中有一个 ExecutorService 线程池,翻看代码可以得知他的实现是 SingleThreadScheduledExecutor,这里含义明显就是单独开启一个线程轮询这一段代码检查 listener,状态变更通知注册在 ConnectionStateManager 上的监听器。
注册 UnhandledErrorListener 通知
同理注册到CuratorFrameworkImpl.unhandledErrorListeners
当中,当后台线程操作发生异常或者handler发生异常的时候会触发。
注册方式
/** * connect ZK, register watchers */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); // 自定义监听器 UnhandledErrorListener fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable zookeeper error"); } }); fk.start(); return fk; }
如何触发?
触发的相关代码在org.apache.curator.framework.imps.CuratorFrameworkImpl#logError
方法中,注意这里的apply
方法处理。
void logError(String reason, final Throwable e) { // 省略其他无关代码 unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>() { @Override public Void apply(UnhandledErrorListener listener) { listener.unhandledError(localReason, e); return null; } }); // 省略无关代码 }
后台线程操作完成时的回调通知
对于不同操作比如 setData
,可以通过链式调用的方式传入回调函数 callback,操作完成之后会执行回调函数完成回调操作。
public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception { // this is another method of getting notification of an async completion client.setData().inBackground(callback).forPath(path, payload); }
缓存机制,多次注册
Curator的缓存机制是一块比较大的部头,Curator 的缓存方式包括:
- Path Cache
- Node Cache
- Tree Cache
缓存在使用之前会和服务端的节点数据进行对比,当数据不一致时,会通过watch机制触发回调刷新本地缓存,同时再次注册Watch,每次重连会注册新的 Watcher,保证 Watcher永远不丢失。
小结
通过通知机制和会话管理两个部分,我们了解到:
- 客户端通知是同步完成。
connectionStateManager.listeners
是由内部的线程池做异步通知CuratorFrameworkImpl.listeners
对于连接状态的通知,与watcher通知线程为同步,由后台线程通知时为异步。- watcher注册过多可能导致重连之后watcher丢失。
写到最后
本节介绍了Curator的基础使用,从源码角度分析了Curator 组件的初始化过程,并且简单分析会话管理和通知机制的相关源码调用。
下面是本文涉及到的源码讲解汇总的一副总图。个人源码分析过程如果有存在错误或者疑问欢迎反馈和讨论。
最后是整个demo代码:
@Slf4j public class CuratorTestExample { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy); // 连接ZK,开启连接 // 自定义监听器 CuratorListener client.getCuratorListenable().addListener((_fk, e) -> { if (e.getType().equals(CuratorEventType.WATCHED)) { log.info("测试"); } }); ConnectionStateListener connectionStateListener = (client1, newState) -> { //Some details log.info("newState => "+ newState); }; // 11:31:17.026 [Curator-ConnectionStateManager-0] INFO com.zxd.interview.zkcurator.CuratorTestExample - newState => CONNECTED client.getConnectionStateListenable().addListener(connectionStateListener); client.start(); // 此处就获取到 zk的一个连接实例。 //..... // 创建znode,如果有必要需要创建父目录 client.create().creatingParentsIfNeeded().withProtection().forPath("/my/path", "Test".getBytes()); InterProcessMutex lock = new InterProcessMutex(client, "/my/path"); lock.acquire(); try { // do some work inside of the critical section here Thread.sleep(1000); } finally { lock.release(); } } }
推荐阅读
ZK客户端Curator使用详解 - 知乎 (zhihu.com)
cloud.tencent.com/developer/a…
Curator目录监听 | Ravitn Blog (donaldhan.github.io)