前言
从 Nacos 客户端配置中心从浅入深原理及源码剖析
文章结合看,提到了客户端会向服务端这一侧发出两个不同类型的请求,分别是:ConfigQueryRequest、ConfigBatchListenRequest
这里会介绍这两个请求在服务端是如何去处理的流程,以及通过 Nacos Dashboard 或 Open API 接口更改了配置以后,在服务端这边到底做了些什么事情,会分析到 DumpService 加载过程以及它去如何去通知客户端配置变更的
服务端处理请求入口
来自客户端发起所有的请求,都有对应的请求处理器进行处理,首先它会在容器刷新前先把所有的 RequestHandler 请求处理器进行注入,以下是通过 Debug 看到的这些处理器
查看 RequestHandlerRegistry#onApplicationEvent 方法,它监听了 ContextRefreshedEvent 事件,所有单例 Bean 加载完以后,会发布此事件,该方法源码如下:
public void onApplicationEvent(ContextRefreshedEvent event) { Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class); Collection<RequestHandler> values = beansOfType.values(); for (RequestHandler requestHandler : values) { Class<?> clazz = requestHandler.getClass(); boolean skip = false; while (!clazz.getSuperclass().equals(RequestHandler.class)) { if (clazz.getSuperclass().equals(Object.class)) { skip = true; break; } clazz = clazz.getSuperclass(); } if (skip) { continue; } try { Method method = clazz.getMethod("handle", Request.class, RequestMeta.class); if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) { TpsControl tpsControl = method.getAnnotation(TpsControl.class); String pointName = tpsControl.pointName(); TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName); tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint); } } catch (Exception e) { //ignore. } Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0]; // 此处往集合中注册请求处理器 registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler); } } // 通过请求类型取出对应的处理器 public RequestHandler getByRequestType(String requestType) { return registryHandlers.get(requestType); }
最终在代码中会调用到 RequestHandler#handleRequest 此方法,通过钩子函数调用子类的 handle 方法,当前类 RequestHandler#handle 方法是空实现!
public Response handleRequest(T request, RequestMeta meta) throws NacosException { for (AbstractRequestFilter filter : requestFilters.filters) { try { Response filterResult = filter.filter(request, meta, this.getClass()); if (filterResult != null && !filterResult.isSuccess()) { return filterResult; } } catch (Throwable throwable) { Loggers.REMOTE.error("filter error", throwable); } } return handle(request, meta); }
该小节分析的是请求处理器如何进来的,主要是要想清楚,它这么多的不同请求类型处理器,想必它最终的入口就只有一个吧,这样看来,入口就在它们的共同父类:RequestHandler
ConfigQueryRequestHandler#handle 方法
通过该类来处理客户端发出的 ConfigQueryRequest 请求,寓意:获取配置信息的请求,源码中该处理器分为以下几步进行处理:
- 优先从 ConfigCacheService#CACHE 缓存集合中获取 CacheItem 数据的 md5 值,Nacos 服务端一启动会往该缓存中塞入数据,以及在更新配置以后会更新对应元素的数据信息
- 若对当前的配置文件没有打标签或灰度标识的话,默认会优先从本地缓存中读取数据,避免对数据库造成读取压力
- 最后组装数据成 ConfigQueryResponse 响应体返回给客户端
摘取部分核心源码如下:
// 灰度发布 if (isBeta) { // ..... } else { if (StringUtils.isBlank(tag)) { // 打标签 if (isUseTag(cacheItem, autoTag)) { // ........ } else { md5 = cacheItem.getMd5(); lastModified = cacheItem.getLastModifiedTs(); // 若直接读取,则从持久层获取数据 if (PropertyUtil.isDirectRead()) { configInfoBase = persistService.findConfigInfo(dataId, group, tenant); } else { // 从本地缓存文件中进行读取 file = DiskUtil.targetFile(dataId, group, tenant); } if (configInfoBase == null && fileNotExist(file)) { // FIXME CacheItem // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1. ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1, clientIp, false); // 配置未存在 response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist"); return response; } } } } response.setMd5(md5); // if (PropertyUtil.isDirectRead()) { response.setLastModified(lastModified); response.setContent(configInfoBase.getContent()); response.setEncryptedDataKey(configInfoBase.getEncryptedDataKey()); response.setResultCode(ResponseCode.SUCCESS.getCode()); } else { // 将文件读取为字符进行返回 String content = null; try { content = readFileContent(file); response.setContent(content); response.setLastModified(lastModified); response.setResultCode(ResponseCode.SUCCESS.getCode()); if (isBeta) { response.setEncryptedDataKey(cacheItem.getEncryptedDataKeyBeta()); } else { response.setEncryptedDataKey(cacheItem.getEncryptedDataKey()); } } catch (IOException e) { response.setErrorInfo(ResponseCode.FAIL.getCode(), e.getMessage()); return response; } }
PropertyUtil#isDirectRead 用于判定是否直接读取,它从一定程度上减少对数据库或 Leader 节点造成压力,如下:
/** * 决定是否直接读取数据,若配置是 MySQL 持久化,会返回 false * 若使用 MySQL,减少数据库读取压力 * 若使用 Raft、Derby,减少领导节点读取压力 * @return 是否直接通过持久化方式读取 */ public static boolean isDirectRead() { return EnvUtil.getStandaloneMode() && isEmbeddedStorage(); }
ConfigBatchListenRequest#handle 方法
客户端应用启动就绪后会向 Nacos Client 调用 addListener 方法新增监听器,当第一个五秒执行 executeConfigListen 方法时,会组装所有的 listener 向 Nacos Server 端发起 ConfigBatchListenRequest 请求
随即服务端就从处理器注册表:registryHandlers 中,取出该请求类型的对应处理器 ConfigChangeBatchListenRequestHandler#handle 方法处理,处理过程如下
- 遍历 Nacos Client 传入的
List<ConfigListenContext>
集合,根据 dataId、group、tenant 组合生成 groupKey - 判断其 listen 状态,若为 true 新增元素以 groupKey 作为 key、connectionId 作为 value,connectionId 作为 key、groupKey 作为 value;若为 false 移除 groupKey 集合、connectionId 集合
- 若 listen 为 true 情况下,还需要作 md5 值比对,用服务端 CacheItem 元素的 md5 值与客户端传入的 md5 作比对,若不一致的话,将当前 dataId、group、tenant 值返回,Nacos Client 接收数据以后就会通知唤醒客户端的监听器.
public ConfigChangeBatchListenResponse handle(ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta) throws NacosException { String connectionId = StringPool.get(meta.getConnectionId()); String tag = configChangeListenRequest.getHeader(Constants.VIPSERVER_TAG); ConfigChangeBatchListenResponse configChangeBatchListenResponse = new ConfigChangeBatchListenResponse(); for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest.getConfigListenContexts()) { String groupKey = GroupKey2.getKey(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant()); groupKey = StringPool.get(groupKey); String md5 = StringPool.get(listenContext.getMd5()); if (configChangeListenRequest.isListen()) { // 新增 groupKeyContext、connectionIdContext 集合元素 configChangeListenContext.addListen(groupKey, md5, connectionId); // 用 CacheItem 元素 md5 值与客户端值作对比,若不一致,会返回当前的 dataId 数据 boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag); if (!isUptoDate) { configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant()); } } else { // 移除 groupKeyContext、connectionIdContext 集合元素 configChangeListenContext.removeListen(groupKey, connectionId); } } return configChangeBatchListenResponse; }
一个 groupKey 对应多个 connectionId,原因:一个配置文件可能会在多个客户端中同时去使用
一个 connectionId 对应多个 groupKey,原因:一个客户端不仅仅是只会使用一个配置文件
DumpService 加载过程
DumpService 是 Nacos Server 作为配置中心比较核心的一个类,用它去负责保持配置数据是最新状态,同时由它去负责与 Nacos Client 之间的心跳;由于后续的控制台更新配置也会使用到此类,在这里先介绍该类的初始化以及它所做的一些事情!
DumpService 类初始化
由于它是抽象类,所以它的初始化过程必然会交由子类来进行实现,又因为我们必然会使用 MySQL 进行配置,实现数据可靠性持久化,所以直接从它的子类ExternalDumpService 进行分析
ExternalDumpService vs EmbeddedDumpService
1、当前是单机模式下,采用的是内嵌的小数据库 Derby 存储若;当前是集群模式下,采用的是持久层可靠的 MySQL 数据库存储
2、内嵌数据库进行存储,重启后所有数据都会消失;而通过 MySQL 存储,下次启动数据仍然存在
3、它们都继承至 DumpService,但它们在容器中通过 Conditional 来约束只会存在其中一个实例,由于在大部分场景下,要支持高可用及数据可靠性,一般都会采用 ExternalDumpService 进行实现
4、它们的持久服务都对应不同的实现,ExternalDumpService 对应的 PersistService 实现为 ExternalStoragePersistServiceImpl,而 EmbeddedDumpService 对应的 PersistService 实现为 EmbeddedStoragePersistServiceImpl
先从 ExternalDumpService 构造方法开始介绍,它默认实现会调用父类 DumpService 构造方法,源码如下:
public ExternalDumpService(PersistService persistService, ServerMemberManager memberManager) { super(persistService, memberManager); } public DumpService(PersistService persistService, ServerMemberManager memberManager) { this.persistService = persistService; this.memberManager = memberManager; this.processor = new DumpProcessor(this); this.dumpAllProcessor = new DumpAllProcessor(this); this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this); this.dumpAllTagProcessor = new DumpAllTagProcessor(this); // dumpTaskMgr->DumpTaskManager 任务管理器组合 DumpProcessor this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager"); this.dumpTaskMgr.setDefaultTaskProcessor(processor); // dumpAllTaskMgr->DumpAllTaskManager 任务管理器组合 DumpAllProcessor this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager"); this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor); // taskId=dumpAllConfigTask 由 DumpAllProcessor 进行处理 this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor); // 区分 beta、tag、普通,分别会创建三个线程去处理. this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor); this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor); // 初始化数据源对象 DynamicDataSource.getInstance().getDataSource(); }
ExternalDumpService#init 方法由 @PostConstruct 注解修饰,那么它就会该类初始化阶段进行调用,而它的 init 方法又是直接调用父类 DumpService#dumpOperate 方法,源码如下:
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor, DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException { String dumpFileContext = "CONFIG_DUMP_TO_FILE"; TimerContext.start(dumpFileContext); try { LogUtil.DEFAULT_LOG.warn("DumpService start"); // 新增全量拉取的任务 Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask()); // beata、tag 区分开来 Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask()); Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask()); // 移除 1000 条超过 30 天数据的历史配置信息 Runnable clearConfigHistory = () -> { LOGGER.warn("clearConfigHistory start"); if (canExecute()) { try { Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays()); int pageSize = 1000; LOGGER.warn("clearConfigHistory, getBeforeStamp:{}, pageSize:{}", startTime, pageSize); persistService.removeConfigHistory(startTime, pageSize); } catch (Throwable e) { LOGGER.error("clearConfigHistory error : {}", e.toString()); } } }; try { // 一启动默认先拉取全量数据存入本地缓存中,存储 CacheItem 元素的集合 dumpConfigInfo(dumpAllProcessor); // update Beta cache LogUtil.DEFAULT_LOG.info("start clear all config-info-beta."); DiskUtil.clearAllBeta(); if (persistService.isExistTable(BETA_TABLE_NAME)) { dumpAllBetaProcessor.process(new DumpAllBetaTask()); } // update Tag cache LogUtil.DEFAULT_LOG.info("start clear all config-info-tag."); DiskUtil.clearAllTag(); if (persistService.isExistTable(TAG_TABLE_NAME)) { dumpAllTagProcessor.process(new DumpAllTagTask()); } // add to dump aggr List<ConfigInfoChanged> configList = persistService.findAllAggrGroup(); if (configList != null && !configList.isEmpty()) { total = configList.size(); List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT); for (List<ConfigInfoChanged> list : splitList) { MergeAllDataWorker work = new MergeAllDataWorker(list); work.start(); } LOGGER.info("server start, schedule merge end."); } } catch (Exception e) { LogUtil.FATAL_LOG .error("Nacos Server did not start because dumpservice bean construction failure :\n" + e); throw new NacosException(NacosException.SERVER_ERROR, "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(), e); } // 集群模式下 if (!EnvUtil.getStandaloneMode()) {{ Runnable heartbeat = () -> { String heartBeatTime = TimeUtils.getCurrentTime().toString(); // 将心热时间写入到磁盘中,避免下次 Nacos 集群节点启动时,未超过 6 小时还去进行一次全量数据拉取,具体逻辑在 dumpConfigInfo 方法体现 try { DiskUtil.saveHeartBeatToDisk(heartBeatTime); } catch (IOException e) { LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage()); } }; // 该任务 10 秒执行一次 ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS); long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10; LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay); // 6*60 分钟也就是 6 小时执行一次全量数据拉取的任务 ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); // 十分钟执行一次任务:移除 1000 条超过 30 天数据的历史配置信息 ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES); } } finally { TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG); } }