ConfigRpcTransportClient#executeConfigListen 核心方法
执行该方法的准备工作都有了,CacheMap 存在元素,阻塞队列中能获取到元素,就会执行 executeConfigListen 方法,对监听的配置信息进行检查,来保证客户端的配置信息是最新的!
该方法核心处理的工作分为以下几步
- 对
isSyncWithServer = true
监听器进行一次预检查,代表它已经向服务端同步一次了,但它默认值一般就是 false,首次进来都需要向服务端进行一次同步配置操作 - 遍历 CacheMap 集合中的元素,若 CacheData 中的 listener 不为空,存入到 listenCachesMap 集合,否则存入到 removeListenCachesMap 集合中
- 遍历 listenCachesMap 集合元素,组装生成 takId 的 RpcClient,向服务端发起 ConfigBatchListenRequest 请求,会做以下几件事情
- 客户端传入当前配置文件的 md5 值、属性:
listen=true
- 服务端会将传入的监听器-key->groupKey、value->connectionId,key->connectionId、value->groupKey 关系进行绑定
- 若 dataId-group-tenant 组合配置的 md5 值发生了改变,就把当前更改的数据进行返回
- 客户端此时对返回的配置文件数据进行再次 md5 比对,若不一致的话就会执行监听器的回调方法
- 修改当前的 CacheData 数据属性,lastModifiedTs=当前时间戳、isSyncWithServer=true
- 遍历 removeListenCachesMap 集合,组装生成 takId 的 RpcClient,向服务端发起 ConfigBatchListenRequest 请求,做以下几件事情
- 客户端传入当前配置文件的 md5 值、属性:
listen=false
- 服务端会将传入的监听器-key->groupKey、value->connectionId,key->connectionId、value->groupKey 关系进行移除
- 最后,从 CacheMap 集合中移除当前 CacheData 元素
- 判别 needAllSync 是否为 true(
代表全量同步,五分钟会执行一次全量同步操作
),若为 true,更新 lastAllSyncTime 最后同步时间为当前时间 - 若配置发生了变化,就往 listenExecutebell 阻塞队列中塞入元素
public void executeConfigListen() { Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16); Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16); long now = System.currentTimeMillis(); // 超出五分钟以后,进行全量配置信息的同步 boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL; for (CacheData cache : cacheMap.get().values()) { synchronized (cache) { // isSyncWithServer 在后面会调整为 true,初始值都是 false if (cache.isSyncWithServer()) { // 检查监听器 md5 值 cache.checkListenerMd5(); if (!needAllSync) { continue; } } // 监听器不为空,添加到待监听器集合中 if (!CollectionUtils.isEmpty(cache.getListeners())) { // 不使用本地配置,默认值为 false if (!cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<>(); listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); } // 监听器为空,添加到待移除监听器集合中 } else if (CollectionUtils.isEmpty(cache.getListeners())) { if (!cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<>(); removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); } } } } boolean hasChangedKeys = false; // 若监听器集合不为空,进行遍历处理 if (!listenCachesMap.isEmpty()) { for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) { String taskId = entry.getKey(); Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2); List<CacheData> listenCaches = entry.getValue(); for (CacheData cacheData : listenCaches) { timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant), cacheData.getLastModifiedTs().longValue()); } ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches); configChangeListenRequest.setListen(true); try { // 创建当前任务的 RpcClient 实例 RpcClient rpcClient = ensureRpcClient(taskId); // 向服务端发起请求,超时时长 3 秒 ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy( rpcClient, configChangeListenRequest); if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) { Set<String> changeKeys = new HashSet<>(); // 若有配置发生了改变,检查后通知监听器 if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { hasChangedKeys = true; for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) { // 生成改变 Key,添加到集合中 String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant()); changeKeys.add(changeKey); boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); refreshContentAndCheck(changeKey, !isInitializing); } } // 处理配置内容 for (CacheData cacheData : listenCaches) { String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant()); if (!changeKeys.contains(groupKey)) { //sync:cache data md5 = server md5 && cache data md5 = all listeners md5. synchronized (cacheData) { if (!cacheData.getListeners().isEmpty()) { // 通过 CAS 调整最后更新时间,避免其他地方同时对这个数据发生了修改 Long previousTimesStamp = timestampMap.get(groupKey); if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, System.currentTimeMillis())) { continue; } // isSyncWithServer 修改为 true cacheData.setSyncWithServer(true); } } } cacheData.setInitializing(false); } } } catch (Exception e) { LOGGER.error("Async listen config change error ", e); try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { //ignore } } } } // 若待移除的监听器集合不为空,进行遍历处理 if (!removeListenCachesMap.isEmpty()) { for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) { String taskId = entry.getKey(); List<CacheData> removeListenCaches = entry.getValue(); ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches); configChangeListenRequest.setListen(false); try { RpcClient rpcClient = ensureRpcClient(taskId); // 取消-移除监听器 boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest); if (removeSuccess) { for (CacheData cacheData : removeListenCaches) { synchronized (cacheData) { if (cacheData.getListeners().isEmpty()) { // 移除当前 CacheData ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); } } } } } catch (Exception e) { LOGGER.error("async remove listen config change error ", e); } try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { //ignore } } } if (needAllSync) { lastAllSyncTime = now; } //If has changed keys,notify re sync md5. if (hasChangedKeys) { notifyListenConfig(); } }
isSyncWithServer 属性比较重要,它在以下几种情况下都会是 false
/** * 1、添加监听器默认值是 false,需要检查 * 2、收到配置改变的通知,设置为 false,需要检查 * 3、监听器被移除,设置为 false,需要检查 */ public boolean isSyncWithServer() { return isSyncWithServer;
首次进来先要进行一次与服务端的数据比对过程,然后把该标识修改为 true,防止在下一个 5s 对没有更改过的配置再次调用服务端,浪费资源;当在 Nacos 控制台或通过接口方式修改了配置,服务端就会推送给客户端一个状态,也就是会把这个属性再次修改为 false,下一次就会主动去拉取服务端的数据进行 md5 值比对,主动拉取配置的方法源码如下:
private void refreshContentAndCheck(String groupKey, boolean notify) { if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) { CacheData cache = cacheMap.get().get(groupKey); refreshContentAndCheck(cache, notify); } } private void refreshContentAndCheck(CacheData cacheData, boolean notify) { try { // 获取服务端该配置文件下的最新信息 ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify); cacheData.setEncryptedDataKey(response.getEncryptedDataKey()); cacheData.setContent(response.getContent()); if (null != response.getConfigType()) { cacheData.setType(response.getConfigType()); } if (notify) { LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(), ContentUtils.truncateContent(response.getContent()), response.getConfigType()); } // 此处进行最新数据的 md5 值比对 cacheData.checkListenerMd5(); } catch (Exception e) { LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId, cacheData.group, cacheData.tenant, e); } }
通过 CacheData#checkListenerMd5 方法去对 md5 值比对,在该方法中会通知 listener 监听器的持有者
void checkListenerMd5() { // 遍历监听器,比对 MD5 值,不同的话就调用监听器回调方法告知配置变更了! for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap); } } } // 摘自 safeNotifyListener 方法中的部分源码 ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); cr.setEncryptedDataKey(encryptedDataKey); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); listenerWrap.inNotifying = true; // 会调用 innerReceive 方法,触发客户端侧回调逻辑 listener.receiveConfigInfo(contentTmp);
从以上方法中就能够知晓监听器的持有者
是如何感知到配置变化的
至于服务端是如何感知到客户端这些监听器的存在,它是通过 ConfigBatchListenRequest 请求传递的,主要看的是 listen 这个属性值,若为 true 服务端就会有一个集合将其维护起来,为 false 服务端就会从集合中将其移除
RpcClient 长连接
以上方法提到了会为每一组监听器创建 RpcClient,它由 RpcClient rpcClient = ensureRpcClient(taskId)
方法处理,如下是它的源码:
private RpcClient ensureRpcClient(String taskId) throws NacosException { synchronized (ClientWorker.this) { Map<String, String> labels = getLabels(); Map<String, String> newLabels = new HashMap<>(labels); newLabels.put("taskId", taskId); // 创建一个基于 GRPC 类型的客户端 RpcClient rpcClient = RpcClientFactory.createClient(uuid + "_config-" + taskId, getConnectionType(), newLabels); if (rpcClient.isWaitInitiated()) { // 初始化客户端的处理器 initRpcClientHandler(rpcClient); rpcClient.setTenant(getTenant()); rpcClient.clientAbilities(initAbilities()); rpcClient.start(); } return rpcClient; } }
主要看的就是 initRpcClientHandler 方法做的事情,源码如下:
private void initRpcClientHandler(final RpcClient rpcClientInner) { /* * 处理来自服务端发起的请求 */ rpcClientInner.registerServerRequestHandler((request) -> { // 当前请求为配置更新通知请求,由 Nacos 服务端发起 if (request instanceof ConfigChangeNotifyRequest) { ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request; LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}", rpcClientInner.getName(), configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant()); String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),configChangeNotifyRequest.getTenant()); CacheData cacheData = cacheMap.get().get(groupKey); if (cacheData != null) { synchronized (cacheData) { cacheData.getLastModifiedTs().set(System.currentTimeMillis()); // isSyncWithServer 更新这个值为 false,就会再次调用服务端拉取配置 cacheData.setSyncWithServer(false); // listenExecutebell 阻塞队列塞入数据,进行下一个 5s 触发调用,更新配置信息 md5 值,进行比对后客户端侧监听回调处理 notifyListenConfig(); } } return new ConfigChangeNotifyResponse(); } return null; });
initRpcClientHandler 方法会接收来自服务端 ConfigChangeNotifyRequest 请求,服务端发布了 LocalDataChangeEvent 事件时会发送此请求,具体的流程在 Nacos 配置中心服务端原理及源码剖析
博文有详细剖析
RpcClient#start 方法及其重要,它在内部会去与 Nacos 服务端建立连接,随即在后面通过 RpcClient 去调用时,服务端才能够拿到当前客户端的元数据信息,确保两边通信是安全态,它内部开启了两个线程去定时监测与服务端之间的连接状态!
// com.alibaba.nacos.common.remote.client.RpcClient#start // 创建两个核心线程数 clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.remote.worker"); t.setDaemon(true); return t; }); // 该线程用于收到连接成功或失败的状态,及时回调监听器处理连接成功、连接失败对应的方法 clientEventExecutor.submit(() -> { while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) { ConnectionEvent take; try { take = eventLinkedBlockingQueue.take(); if (take.isConnected()) { notifyConnected(); } else if (take.isDisConnected()) { notifyDisConnected(); } } catch (Throwable e) { // Do nothing } } }); // 该线程用于定时去监测连接是否可靠,若之前服务端下线了,该 RpcClient 5 秒后会去 clientEventExecutor.submit(() -> { while (true) { try { if (isShutdown()) { break; } // 阻塞 5 秒后放行,也就是 5 秒进行一次心跳监测 ReconnectContext reconnectContext = reconnectionSignal .poll(keepAliveTime, TimeUnit.MILLISECONDS); if (reconnectContext == null) { // 检查存活时长是否超过 5 s if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) { boolean isHealthy = healthCheck(); if (!isHealthy) { // 连接不健康并且当前连接是空就不处理 if (currentConnection == null) { continue; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server healthy check fail, currentConnection = {}", name, currentConnection.getConnectionId()); // 获取当前 Rpc 状态,若是终止状态就退出 RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get(); if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) { break; } boolean statusFLowSuccess = RpcClient.this.rpcClientStatus .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY); if (statusFLowSuccess) { reconnectContext = new ReconnectContext(null, false); } else { continue; } } else { // 更新时间 lastActiveTimeStamp = System.currentTimeMillis(); continue; } } else { continue; } } if (reconnectContext.serverInfo != null) { // clear recommend server if server is not in server list. boolean serverExist = false; for (String server : getServerListFactory().getServerList()) { ServerInfo serverInfo = resolveServerInfo(server); if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) { serverExist = true; reconnectContext.serverInfo.serverPort = serverInfo.serverPort; break; } } if (!serverExist) { LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Recommend server is not in server list, ignore recommend server {}", name, reconnectContext.serverInfo.getAddress()); reconnectContext.serverInfo = null; } } // 重新建立新的连接,选择另外一个 UP 服务端节点进行连接,同时会往 eventLinkedBlockingQueue 队列中塞数据进行连接回调通知 reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail); } catch (Throwable throwable) { // Do nothing } } }); // 首次启动 RpcClient 会建立与服务端的连接,担保重试三次与服务端建立连接的机会 int startUpRetryTimes = RETRY_TIMES; while (startUpRetryTimes > 0 && connectToServer == null) { try { startUpRetryTimes--; ServerInfo serverInfo = nextRpcServer(); LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo); connectToServer = connectToServer(serverInfo); } catch (Throwable e) { LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes, e); } }
在这里总结一下,调用 RpcClient#start 方法,会创建两个线程
- 一个线程来负责监听连接状态,一直阻塞,直到 eventLinkedBlockingQueue 队列中存在元素,无论连接成功还是失败都会通知监听器去处理对应的回调方法,notifyConnected、notifyDisConnected
- 一个线程来通过 5s 定时监测的机制来确保客户端这边与服务端之间的通信是正常的,在集群模式下,若当前连接的服务节点不正常了,那么就会挑选另外一台服务节点进行重连,以确保客户端这边的正常操作,每次重连成功会往 eventLinkedBlockingQueue 塞入事件:ConnectionEvent.CONNECTED,若重连失败 塞入事件:ConnectionEvent.DISCONNECTED
- RpcClient 在最尾部在初始建立与服务节点的连接,同时往 eventLinkedBlockingQueue 塞入事件:ConnectionEvent.CONNECTED
总结
Nacos 2.x 抛弃了 1.x 采用的长轮询模式,替而代之的是长连接模式,通过心跳的机制定时去监测服务端实例状态是否正常,若不正常了,切换到另外一个服务端实例
客户端感应到了主动去拉取最新的配置数据进行 md5 比对,若与之前的 md5 不一样了,此时将发生变化的状态通知到 Listener 持有者,此时,持有者就会重新从 Nacos 服务端拿到最新的数据保存下来,在这里总结一下 Nacos 客户端关于配置这块内容应用到的轮询方案:
- Nacos Config Client 每 5 分钟会进行一次全量比对,有没有配置发生了变化
- Nacos Config Client 每 5 秒钟会去扫描一次,当前 CacheMap 集合元素是不是发生了配置变化
关于 Nacos 2.x 在架构上改进方案的更多内容解读,可以看这篇文章:支持 gRPC 长链接,深度解读 Nacos 2.0 架构设计及新模型
博文主要介绍的是 Nacos 客户端这一侧的拉取是如何去工作的,同时提及到了 Nacos 服务端部分内容,关于 Nacos 服务端这一块推送的内容会单独有一篇文章进行详细解析!
如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!
大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!