在该方法中作了以下几件重要的事情:
- 创建三个线程用于全量数据拉取,区分 beta 灰度、tag 标签、普通任务,通过定时调度线程池每隔 6 小时执行一次
- 创建一个线程用于处理清除数据库中保存时间超过 30天 的配置数据,定时调度线程池每隔十分钟查询一次,若存在这样的数据,则通过持久化层进行删除,每一次执行执行只删除 1000 条数据
- DumpService#dumpConfigInfo 会默认执行,它在于 Nacos 服务端一启动就会进行全量数据的拉取,存储到本地
DumpService#dumpConfigInfo 方法
DumpService#dumpConfigInfo 会默认执行,它里面一启动会先删除所有本地文件存储的数据,然后把全部的配置数据都加载一次再次存入到本地文件中
在 DumpService#dumpOperate 方法还提及到了 heartBeat 文件,它用于确保集群节点未超过 6 小时还进行一次数据的拉取,避免再次启动还进行全量数据拉取,它只作数据的 md5 值比对,若发现了不一致的数据才会重新去加载!
具体源码如下:
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException { int timeStep = 6; boolean isAllDump = true; // initial dump all FileInputStream fis = null; Timestamp heartheatLastStamp = null; try { // 默认值 false if (isQuickStart()) { File heartbeatFile = DiskUtil.heartBeatFile(); if (heartbeatFile.exists()) { // 若 heartbeatFile 文件存在,并且时间未超过 6 个小时,就不再进行全量数据拉取 fis = new FileInputStream(heartbeatFile); String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE); heartheatLastStamp = Timestamp.valueOf(heartheatTempLast); if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime() < timeStep * 60 * 60 * 1000) { isAllDump = false; } } } // 全量数据拉取操作 if (isAllDump) { LogUtil.DEFAULT_LOG.info("start clear all config-info."); // 清除所有的配置信息,/data/config-data 数据都会清除 DiskUtil.clearAll(); // 该方法会批次查询持久层数据,进行本地数据保存 dumpAllProcessor.process(new DumpAllTask()); } else { // 不进行数据拉取,只对持久层数据超过 6 小时的作数据校验 Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp, timeStep); DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp, TimeUtils.getCurrentTime()); dumpChangeProcessor.process(new DumpChangeTask()); Runnable checkMd5Task = () -> { LogUtil.DEFAULT_LOG.error("start checkMd5Task"); // 取出本地 md5 与 CacheItem md5 值不同的数据 List<String> diffList = ConfigCacheService.checkMd5(); for (String groupKey : diffList) { String[] dg = GroupKey.parseKey(groupKey); String dataId = dg[0]; String group = dg[1]; String tenant = dg[2]; // 重新获取配置并更新本地数据以及 CacheItem 缓存 ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant); ConfigCacheService.dumpChange(dataId, group, tenant, configInfo.getContent(), configInfo.getLastModified(), configInfo.getEncryptedDataKey()); } LogUtil.DEFAULT_LOG.error("end checkMd5Task"); }; // 刚开启执行一次,后面每隔 12 小时执行一次 ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS); } } catch (IOException e) { LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage()); throw e; } finally { if (null != fis) { try { fis.close(); } catch (IOException e) { LogUtil.DEFAULT_LOG.warn("close file failed"); } } } }
在分析源码过程中,多次提到了 CacheItem 对象以及本地文件数据,在这里解释一下:
CacheItem 对象的数据用于 md5 值比对,包括了与客户端传入的参数 md5 值比对、服务端本地文件数据的 md5 比对
本地文件数据是用来给客户端进行返回的,当客户端发起了 ConfigQueryRequest 请求时,会优先从本地文件读取数据后返回,而不是从数据库拿数据,主要就是为了减少对数据库造成的压力
DumpAllProcessor#process 方法
用于拉取全量数据,数据库每一条配置数据都会处理一次
public boolean process(NacosTask task) { long currentMaxId = persistService.findConfigMaxId(); long lastMaxId = 0; while (lastMaxId < currentMaxId) { // 每次查 1000 条记录出来 Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE); if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) { for (ConfigInfoWrapper cf : page.getPageItems()) { long id = cf.getId(); lastMaxId = Math.max(id, lastMaxId); // 元数据加载 if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) { AggrWhitelist.load(cf.getContent()); } if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) { ClientIpWhiteList.load(cf.getContent()); } if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) { SwitchService.load(cf.getContent()); } // 存入 CacheItem、本地文件、通知客户端更新数据 ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(), cf.getType(), cf.getEncryptedDataKey()); final String content = cf.getContent(); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}", GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(), md5); } DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId); } else { lastMaxId += PAGE_SIZE; } } return true; }
每次取出 1000 条数据,针对每条数据作三步处理:
- 转换 CacheItem 对象存入到集合中
- 本地文件写入数据
- 发送通知给到客户端
DumpChangeProcessor#process 方法
用于对发生改变了的数据及时更新本地文件数据以及 CacheItem,然后及时通知客户端
public boolean process(NacosTask task) { LogUtil.DEFAULT_LOG.warn("quick start; startTime:{},endTime:{}", startTime, endTime); LogUtil.DEFAULT_LOG.warn("updateMd5 start"); long startUpdateMd5 = System.currentTimeMillis(); // 查询最前面 1000 条数据 List<ConfigInfoWrapper> updateMd5List = persistService.listAllGroupKeyMd5(); LogUtil.DEFAULT_LOG.warn("updateMd5 count:{}", updateMd5List.size()); for (ConfigInfoWrapper config : updateMd5List) { // 与 CacheItem md5 值作比对,若不一致,优先通知客户端监听器及时比对拉取数据! final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup()); ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified(), config.getEncryptedDataKey()); } long endUpdateMd5 = System.currentTimeMillis(); LogUtil.DEFAULT_LOG.warn("updateMd5 done,cost:{}", endUpdateMd5 - startUpdateMd5); LogUtil.DEFAULT_LOG.warn("deletedConfig start"); long startDeletedConfigTime = System.currentTimeMillis(); // 查询 6 小时已删除的数据 List<ConfigInfo> configDeleted = persistService.findDeletedConfig(startTime, endTime); LogUtil.DEFAULT_LOG.warn("deletedConfig count:{}", configDeleted.size()); for (ConfigInfo configInfo : configDeleted) { // 该数据若查询不到,直接从本地文件、CacheItem 中删除、通知客户端对比数据 if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()) == null) { ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()); } } long endDeletedConfigTime = System.currentTimeMillis(); LogUtil.DEFAULT_LOG.warn("deletedConfig done,cost:{}", endDeletedConfigTime - startDeletedConfigTime); LogUtil.DEFAULT_LOG.warn("changeConfig start"); final long startChangeConfigTime = System.currentTimeMillis(); // 查询 6 小时内的数据 List<ConfigInfoWrapper> changeConfigs = persistService.findChangeConfig(startTime, endTime); LogUtil.DEFAULT_LOG.warn("changeConfig count:{}", changeConfigs.size()); for (ConfigInfoWrapper cf : changeConfigs) { // 与本地文件 md5 值作比对,若不一致,先更新本地文件的内容,确保客户端拉取的配置信息是更新的,然后再更新 CacheItem 信息 ConfigCacheService.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(), cf.getEncryptedDataKey()); final String content = cf.getContent(); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); LogUtil.DEFAULT_LOG .info("[dump-change-ok] {}, {}, length={}, md5={}", GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(), md5); } // 重新加载涉及到的元数据信息 ConfigCacheService.reloadConfig(); long endChangeConfigTime = System.currentTimeMillis(); LogUtil.DEFAULT_LOG.warn("changeConfig done,cost:{}", endChangeConfigTime - startChangeConfigTime); return true; }
- 更新 CacheItem md5 值,同时发布 LocalDataChangeEvent 事件通知客户端拉取数据进行数据对比
- 查询 6 个小时内在
数据库不存在的数据而在本地文件、CacheItem 存在的数据
,直接从本地文件先移除,然后移除当前 CacheItem 缓存,最后通知给客户端及时更新最新数据 - 查询 6 小时内未删除的数据,与本地进行 md5 值比对,若不一致的情况下发布 LocalDataChangeEvent 事件及时通知客户端
Dashboard、Open-API 更新配置
在控制台修改配置或通过 Open-APi 接口最终调用的都是 /v1/cs/configs
,它所对应的方法在 ConfigController#publishConfig
一开始它会直接存储到数据库中,确保数据库的数据是最新状态的,然后再发布 ConfigDataChangeEvent 配置变更事件去通过其他的 Nacos 服务节点以及对应节点所绑定的客户端
事件发布者会存入到队列中,若存入队列失败,说明该队列已经满了,那么就直接通知监听器处理,否则加入到队列中进行排队处理
处理 ConfigDataChangeEvent 事件的订阅者是 AsyncNotifyService,在它的构造函数中向事件中心 NotifyCenter 注入了此事件以及对应处理该事件的订阅者
@Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; // 向事件中心注入该事件 NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // 注册订阅者来监听 处理 ConfigDataChangeEvent 事件. NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; Collection<Member> ipList = memberManager.allMembers(); // 任何类型的队列都可以支持 Queue<NotifySingleTask> httpQueue = new LinkedList<>(); Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>(); // 集群中每个节点都会往队列中塞入一条数据 for (Member member : ipList) { // 检查成员是否支持长连接,不支持往 HttpQueue 新增,否则往 RpcQueue 新增 if (!MemberUtil.isSupportedLongCon(member)) { httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } else { rpcQueue.add( new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); } } if (!httpQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); } if (!rpcQueue.isEmpty()) { // 异步通知执行长连接的任务 ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); } } } @Override public Class<? extends Event> subscribeType() { return ConfigDataChangeEvent.class; } }); }
当配置发生了变化,Nacos 集群下的每个节点都要同步一份,向队列里塞入一份数据,区分 http、rpc 方式,分别存入到不同的队列,自 Nacos 2.x 以后,默认所有的成员节点都是支持 rpc 的,所以只需要分析 AsyncRpcTask 任务是如何异步处理的就可以了
AsyncRpcTask 处理过程
死循环遍历阻塞队列,每次都取出一条元素,封装 ConfigChangeClusterSyncRequest 请求
// AsyncRpcTask#run public void run() { while (!queue.isEmpty()) { NotifySingleRpcTask task = queue.poll(); ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest(); syncRequest.setDataId(task.getDataId()); syncRequest.setGroup(task.getGroup()); syncRequest.setBeta(task.isBeta); syncRequest.setLastModified(task.getLastModified()); syncRequest.setTag(task.tag); syncRequest.setTenant(task.getTenant()); Member member = task.member; if (memberManager.getSelf().equals(member)) { // 判别是否灰度 if (syncRequest.isBeta()) { dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getLastModified(), NetUtils.localIP(), true); } else { dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP()); } continue; } if (memberManager.hasMember(member.getAddress())) { // 检查当前成员节点是否健康,获取为空或 state != UP boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress()); if (unHealthNeedDelay) { // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, member.getAddress()); // 若该节点不健康,延迟去执行这个节点的任务 asyncTaskExecute(task); } else { if (!MemberUtil.isSupportedLongCon(member)) { asyncTaskExecute( new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag, task.getLastModified(), member.getAddress(), task.isBeta)); } else { try { // 向节点发起请求配置同步,失败的话会延迟处理此任务进行重试 configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task)); } catch (Exception e) { MetricsMonitor.getConfigNotifyException().increment(); asyncTaskExecute(task); } } } } else { //No nothig if member has offline. } } }
ConfigChangeClusterSyncRequestHandler#handle 方法源码:
public ConfigChangeClusterSyncResponse handle(ConfigChangeClusterSyncRequest configChangeSyncRequest, RequestMeta meta) throws NacosException { if (configChangeSyncRequest.isBeta()) { dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(), configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp(), true); } else { dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(), configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp()); } return new ConfigChangeClusterSyncResponse(); }
- 判断当前元素的成员信息是否是当前节点,若是的话,直接调用 DumpService#dump 方法进行处理,随即处理下一条元素
- 若不是的话,先校验当前元素的成员是否为健康状态,不健康的状态下延迟的去执行这个任务
- 再次校验当前元素成员是否支持长连接方式去处理任务,支持的话直接调用
ConfigClusterRpcClientProxy#syncConfigChange
方法,向当前元素成员节点发起 ConfigChangeClusterSyncRequest 请求,最终由 ConfigChangeClusterSyncRequestHandler#handle 方法进行处理 - 可以发现最终都是由
DumpService#dump
方法去处理后续逻辑的
继续分析 DumpService#dump 方法,该源码如下:
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta)); dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }
之前在分析 DumpService 类加载过程时,提到了 dumpTaskMgr->DumpTaskManager 任务管理器组合 DumpProcessor
,在这里,它往任务管理器中添加了 Task,但仅仅是加了 Task 并未作任何的处理。
DumpTaskManager 将处理的工作委托给了它的父类 NacosDelayTaskExecuteEngine
,在它的父类里面会开启一个线程 ProcessRunnable 定时去处理这些任务
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) { this(name, initCapacity, logger, 100L); } public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); }
然后就是 ProcessRunnable#run —> NacosDelayTaskExecuteEngine#processTasks,通过 taskKey 获取其对应的处理器,之前在创建 dumpTaskMgr 给它制定了一个默认的处理器 DumpProcessor,所以最终会由它来处理这种任务
DumpProcessor#process 处理后续的逻辑,它最终会调用到 ConfigCacheService#dump 方法,该方法会把新更改的配置信息存入到本地文件中,同时更新 CacheItem md5 值,最后再发布 LocalDateChangeEvent 事件去通知客户端!整理流程图如下:
LocalDataChangeEvent
当配置涉及到发生改变时,都会发生 LocalDataEvent 事件,那么肯定有地方去处理,它交给了 RpcConfigChangeNotifier 类,直接在介绍 DumpService 子类 ExternalDumpService 时,它是依赖于此类的 @DependsOn({"rpcConfigChangeNotifier"})
public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent>
继承至了 Subscriber 类,向注册中心添加了该订阅者,那么肯定就会实现 onEvent 方法进行处理了
public void onEvent(LocalDataChangeEvent event) { String groupKey = event.groupKey; boolean isBeta = event.isBeta; List<String> betaIps = event.betaIps; String[] strings = GroupKey.parseKey(groupKey); String dataId = strings[0]; String group = strings[1]; String tenant = strings.length > 2 ? strings[2] : ""; String tag = event.tag; // 核心在这 configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag); }
onEvent 方法只是将数据取出来,真正的处理在 configDataChanged 方法
public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta, List<String> betaIps, String tag) { // 获取到 groupKey 所有的客户端连接Id Set<String> listeners = configChangeListenContext.getListeners(groupKey); if (CollectionUtils.isEmpty(listeners)) { return; } int notifyClientCount = 0; for (final String client : listeners) { // 通过连接 Id 获取到客户端的连接实例 Connection connection = connectionManager.getConnection(client); if (connection == null) { continue; } ConnectionMeta metaInfo = connection.getMetaInfo(); //beta ips check. String clientIp = metaInfo.getClientIp(); String clientTag = metaInfo.getTag(); if (isBeta && betaIps != null && !betaIps.contains(clientIp)) { continue; } //tag check if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) { continue; } // 先构建 ConfigChangeNotifyRequest 请求体,再推送 RpcPushTask 任务 ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant); RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName()); push(rpcPushRetryTask); notifyClientCount++; } Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey); }
configChangeListenContext.getListeners 方法里面存储的数据是之前客户端向服务端注册监听器时添加进去的,在这里取出来就能知道要向那些客户端去同步配置已经改变的这个信息了
组装好 ConfigChangeNotifyRequest 请求后向对应的客户端实例发起请求,这个时候就轮到客户端这边来处理后续的工作了!
在 Nacos 客户端配置中心从浅入深原理及源码剖析 博文中提及到了在向服务端提交 ConfigBatchListenRequest 请求时,会先调用 ClientWorker.ConfigRpcTransportClient#ensureRpcClient 方法先创建好 RpcClient,在详细介绍 RpcClient 类时,在它创建时为它注入了一些请求处理器,最终也就会由它里面的处理器去接收后处理!更多内容可以详细此篇博文
总结
解决 Nacos 客户端配置中心从浅入深原理及源码剖析
文章遗留的知识, 基于客户端推送的查询配置、新增监听器请求,告知在服务端这边是如何处理的,对于客户端所有的请求,服务端都有一个单独的处理类去进行处理,由其父类 RequestHandler 统一管理,转发子类去处理核心逻辑
Nacos Server 服务节点在启动时,总结以下几个重要的点:
- 通过 heartbeatFile 文件写入的时长是否超过 6 小时,来判断是否进行全量数据拉取还是对可变的数据进行重新加载
- DumpAllTaskManager 任务管理器,6 小时执行一次全量数据的同步;DumpTaskManager 任务管理器,每隔 0.1 秒去监测是否有 DumpTask 任务需要处理,来确保在 Nacos 控制台上更新配置,能够及时响应给客户端,让客户端主动的来拉取最新的配置信息
- 服务端同步到给客户端动作,基于异步的方式进行传递;同时配置中心用到了 CacheItem 缓存机制来与客户端请求的 md5 值作比对、本地文件的方式存储来减少对数据库造成的压力,每次客户端拉取数据都直接从本地文件中获取,不去请求数据库,数据库这一层只负责作数据的可靠性存储!
如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!
大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!