Nacos 服务端配置中心从浅入深原理及源码剖析(上)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: Nacos 服务端配置中心从浅入深原理及源码剖析(上)

前言

Nacos 客户端配置中心从浅入深原理及源码剖析 文章结合看,提到了客户端会向服务端这一侧发出两个不同类型的请求,分别是:ConfigQueryRequest、ConfigBatchListenRequest

这里会介绍这两个请求在服务端是如何去处理的流程,以及通过 Nacos Dashboard 或 Open API 接口更改了配置以后,在服务端这边到底做了些什么事情,会分析到 DumpService 加载过程以及它去如何去通知客户端配置变更的

服务端处理请求入口

来自客户端发起所有的请求,都有对应的请求处理器进行处理,首先它会在容器刷新前先把所有的 RequestHandler 请求处理器进行注入,以下是通过 Debug 看到的这些处理器

查看 RequestHandlerRegistry#onApplicationEvent 方法,它监听了 ContextRefreshedEvent 事件,所有单例 Bean 加载完以后,会发布此事件,该方法源码如下:

public void onApplicationEvent(ContextRefreshedEvent event) {
  Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
  Collection<RequestHandler> values = beansOfType.values();
  for (RequestHandler requestHandler : values) {
    Class<?> clazz = requestHandler.getClass();
    boolean skip = false;
    while (!clazz.getSuperclass().equals(RequestHandler.class)) {
      if (clazz.getSuperclass().equals(Object.class)) {
        skip = true;
        break;
      }
      clazz = clazz.getSuperclass();
    }
    if (skip) {
      continue;
    }
    try {
      Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
      if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
        TpsControl tpsControl = method.getAnnotation(TpsControl.class);
        String pointName = tpsControl.pointName();
        TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
        tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
      }
    } catch (Exception e) {
      //ignore.
    }
    Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
    // 此处往集合中注册请求处理器
    registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
  }
}
// 通过请求类型取出对应的处理器
public RequestHandler getByRequestType(String requestType) {
  return registryHandlers.get(requestType);
}

最终在代码中会调用到 RequestHandler#handleRequest 此方法,通过钩子函数调用子类的 handle 方法,当前类 RequestHandler#handle 方法是空实现!

public Response handleRequest(T request, RequestMeta meta) throws NacosException {
  for (AbstractRequestFilter filter : requestFilters.filters) {
    try {
      Response filterResult = filter.filter(request, meta, this.getClass());
      if (filterResult != null && !filterResult.isSuccess()) {
        return filterResult;
      }
    } catch (Throwable throwable) {
      Loggers.REMOTE.error("filter error", throwable);
    }
  }
  return handle(request, meta);
}

该小节分析的是请求处理器如何进来的,主要是要想清楚,它这么多的不同请求类型处理器,想必它最终的入口就只有一个吧,这样看来,入口就在它们的共同父类:RequestHandler

ConfigQueryRequestHandler#handle 方法

通过该类来处理客户端发出的 ConfigQueryRequest 请求,寓意:获取配置信息的请求,源码中该处理器分为以下几步进行处理:

  1. 优先从 ConfigCacheService#CACHE 缓存集合中获取 CacheItem 数据的 md5 值,Nacos 服务端一启动会往该缓存中塞入数据,以及在更新配置以后会更新对应元素的数据信息
  2. 若对当前的配置文件没有打标签或灰度标识的话,默认会优先从本地缓存中读取数据,避免对数据库造成读取压力
  3. 最后组装数据成 ConfigQueryResponse 响应体返回给客户端

摘取部分核心源码如下:

// 灰度发布
if (isBeta) {
  // .....
} else {
  if (StringUtils.isBlank(tag)) {
    // 打标签
    if (isUseTag(cacheItem, autoTag)) {
      // ........
    } else {
      md5 = cacheItem.getMd5();
      lastModified = cacheItem.getLastModifiedTs();
      // 若直接读取,则从持久层获取数据
      if (PropertyUtil.isDirectRead()) {
        configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
      } else {
        // 从本地缓存文件中进行读取
        file = DiskUtil.targetFile(dataId, group, tenant);
      }
      if (configInfoBase == null && fileNotExist(file)) {
        // FIXME CacheItem
        // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.
        ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
                                        ConfigTraceService.PULL_EVENT_NOTFOUND, -1, clientIp, false);
        // 配置未存在
        response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");
        return response;
      }
    }
  }
}
response.setMd5(md5);
// 
if (PropertyUtil.isDirectRead()) {
  response.setLastModified(lastModified);
  response.setContent(configInfoBase.getContent());
  response.setEncryptedDataKey(configInfoBase.getEncryptedDataKey());
  response.setResultCode(ResponseCode.SUCCESS.getCode());
} else {
  // 将文件读取为字符进行返回
  String content = null;
  try {
    content = readFileContent(file);
    response.setContent(content);
    response.setLastModified(lastModified);
    response.setResultCode(ResponseCode.SUCCESS.getCode());
    if (isBeta) {
      response.setEncryptedDataKey(cacheItem.getEncryptedDataKeyBeta());
    } else {
      response.setEncryptedDataKey(cacheItem.getEncryptedDataKey());
    }
  } catch (IOException e) {
    response.setErrorInfo(ResponseCode.FAIL.getCode(), e.getMessage());
    return response;
  }
}

PropertyUtil#isDirectRead 用于判定是否直接读取,它从一定程度上减少对数据库或 Leader 节点造成压力,如下:

/**
 * 决定是否直接读取数据,若配置是 MySQL 持久化,会返回 false
 * 若使用 MySQL,减少数据库读取压力
 * 若使用 Raft、Derby,减少领导节点读取压力
 * @return 是否直接通过持久化方式读取
 */
public static boolean isDirectRead() {
  return EnvUtil.getStandaloneMode() && isEmbeddedStorage();
}

ConfigBatchListenRequest#handle 方法

客户端应用启动就绪后会向 Nacos Client 调用 addListener 方法新增监听器,当第一个五秒执行 executeConfigListen 方法时,会组装所有的 listener 向 Nacos Server 端发起 ConfigBatchListenRequest 请求

随即服务端就从处理器注册表:registryHandlers 中,取出该请求类型的对应处理器 ConfigChangeBatchListenRequestHandler#handle 方法处理,处理过程如下

  1. 遍历 Nacos Client 传入的 List<ConfigListenContext> 集合,根据 dataId、group、tenant 组合生成 groupKey
  2. 判断其 listen 状态,若为 true 新增元素以 groupKey 作为 key、connectionId 作为 value,connectionId 作为 key、groupKey 作为 value;若为 false 移除 groupKey 集合、connectionId 集合
  3. 若 listen 为 true 情况下,还需要作 md5 值比对,用服务端 CacheItem 元素的 md5 值与客户端传入的 md5 作比对,若不一致的话,将当前 dataId、group、tenant 值返回,Nacos Client 接收数据以后就会通知唤醒客户端的监听器.
public ConfigChangeBatchListenResponse handle(ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta)
  throws NacosException {
  String connectionId = StringPool.get(meta.getConnectionId());
  String tag = configChangeListenRequest.getHeader(Constants.VIPSERVER_TAG);
  ConfigChangeBatchListenResponse configChangeBatchListenResponse = new ConfigChangeBatchListenResponse();
  for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest.getConfigListenContexts()) {
    String groupKey = GroupKey2.getKey(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
    groupKey = StringPool.get(groupKey);
    String md5 = StringPool.get(listenContext.getMd5());
    if (configChangeListenRequest.isListen()) {
      // 新增 groupKeyContext、connectionIdContext 集合元素
      configChangeListenContext.addListen(groupKey, md5, connectionId);
      // 用 CacheItem 元素 md5 值与客户端值作对比,若不一致,会返回当前的 dataId 数据
      boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
      if (!isUptoDate) {
        configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(),
                                                        listenContext.getTenant());
      }
    } else {
      // 移除 groupKeyContext、connectionIdContext 集合元素
      configChangeListenContext.removeListen(groupKey, connectionId);
    }
  }
  return configChangeBatchListenResponse;
}

一个 groupKey 对应多个 connectionId,原因:一个配置文件可能会在多个客户端中同时去使用

一个 connectionId 对应多个 groupKey,原因:一个客户端不仅仅是只会使用一个配置文件

DumpService 加载过程

DumpService 是 Nacos Server 作为配置中心比较核心的一个类,用它去负责保持配置数据是最新状态,同时由它去负责与 Nacos Client 之间的心跳;由于后续的控制台更新配置也会使用到此类,在这里先介绍该类的初始化以及它所做的一些事情!

DumpService 类初始化

由于它是抽象类,所以它的初始化过程必然会交由子类来进行实现,又因为我们必然会使用 MySQL 进行配置,实现数据可靠性持久化,所以直接从它的子类ExternalDumpService 进行分析

ExternalDumpService vs EmbeddedDumpService

1、当前是单机模式下,采用的是内嵌的小数据库 Derby 存储若;当前是集群模式下,采用的是持久层可靠的 MySQL 数据库存储

2、内嵌数据库进行存储,重启后所有数据都会消失;而通过 MySQL 存储,下次启动数据仍然存在

3、它们都继承至 DumpService,但它们在容器中通过 Conditional 来约束只会存在其中一个实例,由于在大部分场景下,要支持高可用及数据可靠性,一般都会采用 ExternalDumpService 进行实现

4、它们的持久服务都对应不同的实现,ExternalDumpService 对应的 PersistService 实现为 ExternalStoragePersistServiceImpl,而 EmbeddedDumpService 对应的 PersistService 实现为 EmbeddedStoragePersistServiceImpl

先从 ExternalDumpService 构造方法开始介绍,它默认实现会调用父类 DumpService 构造方法,源码如下:

public ExternalDumpService(PersistService persistService, ServerMemberManager memberManager) {
  super(persistService, memberManager);
}
public DumpService(PersistService persistService, ServerMemberManager memberManager) {
  this.persistService = persistService;
  this.memberManager = memberManager;
  this.processor = new DumpProcessor(this);
  this.dumpAllProcessor = new DumpAllProcessor(this);
  this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
  this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
  // dumpTaskMgr->DumpTaskManager 任务管理器组合 DumpProcessor
  this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
  this.dumpTaskMgr.setDefaultTaskProcessor(processor);
  // dumpAllTaskMgr->DumpAllTaskManager 任务管理器组合 DumpAllProcessor
  this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
  this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
  // taskId=dumpAllConfigTask 由 DumpAllProcessor 进行处理
  this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
  // 区分 beta、tag、普通,分别会创建三个线程去处理.
  this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
  this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
  // 初始化数据源对象
  DynamicDataSource.getInstance().getDataSource();
}

ExternalDumpService#init 方法由 @PostConstruct 注解修饰,那么它就会该类初始化阶段进行调用,而它的 init 方法又是直接调用父类 DumpService#dumpOperate 方法,源码如下:

protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
            DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
  String dumpFileContext = "CONFIG_DUMP_TO_FILE";
  TimerContext.start(dumpFileContext);
  try {
    LogUtil.DEFAULT_LOG.warn("DumpService start");
    // 新增全量拉取的任务
    Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
    // beata、tag 区分开来
    Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
    Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
    // 移除 1000 条超过 30 天数据的历史配置信息
    Runnable clearConfigHistory = () -> {
      LOGGER.warn("clearConfigHistory start");
      if (canExecute()) {
        try {
          Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
          int pageSize = 1000;
          LOGGER.warn("clearConfigHistory, getBeforeStamp:{}, pageSize:{}", startTime, pageSize);
          persistService.removeConfigHistory(startTime, pageSize);
        } catch (Throwable e) {
          LOGGER.error("clearConfigHistory error : {}", e.toString());
        }
      }
    };
    try {
      // 一启动默认先拉取全量数据存入本地缓存中,存储 CacheItem 元素的集合
      dumpConfigInfo(dumpAllProcessor);
      // update Beta cache
      LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
      DiskUtil.clearAllBeta();
      if (persistService.isExistTable(BETA_TABLE_NAME)) {
        dumpAllBetaProcessor.process(new DumpAllBetaTask());
      }
      // update Tag cache
      LogUtil.DEFAULT_LOG.info("start clear all config-info-tag.");
      DiskUtil.clearAllTag();
      if (persistService.isExistTable(TAG_TABLE_NAME)) {
        dumpAllTagProcessor.process(new DumpAllTagTask());
      }
      // add to dump aggr
      List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
      if (configList != null && !configList.isEmpty()) {
        total = configList.size();
        List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
        for (List<ConfigInfoChanged> list : splitList) {
          MergeAllDataWorker work = new MergeAllDataWorker(list);
          work.start();
        }
        LOGGER.info("server start, schedule merge end.");
      }
    } catch (Exception e) {
      LogUtil.FATAL_LOG
        .error("Nacos Server did not start because dumpservice bean construction failure :\n" + e);
      throw new NacosException(NacosException.SERVER_ERROR,
                               "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
                               e);
    }
    // 集群模式下
    if (!EnvUtil.getStandaloneMode()) {{
      Runnable heartbeat = () -> {
        String heartBeatTime = TimeUtils.getCurrentTime().toString();
        // 将心热时间写入到磁盘中,避免下次 Nacos 集群节点启动时,未超过 6 小时还去进行一次全量数据拉取,具体逻辑在 dumpConfigInfo 方法体现
        try {
          DiskUtil.saveHeartBeatToDisk(heartBeatTime);
        } catch (IOException e) {
          LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
        }
      };
      // 该任务 10 秒执行一次
      ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
      long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
      LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
      // 6*60 分钟也就是 6 小时执行一次全量数据拉取的任务
      ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
      ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
      ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
      // 十分钟执行一次任务:移除 1000 条超过 30 天数据的历史配置信息
      ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
    }
  } finally {
    TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
  }
}

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1天前
|
Nacos
nacos 配置页面的模糊查询
nacos 配置页面的模糊查询
|
7天前
|
机器学习/深度学习 Java Nacos
Nacos 配置中心(2023旧笔记)
Nacos 配置中心(2023旧笔记)
13 0
|
7天前
|
存储 前端开发 Java
第十一章 Spring Cloud Alibaba nacos配置中心
第十一章 Spring Cloud Alibaba nacos配置中心
15 0
|
10天前
|
敏捷开发 API 持续交付
云效产品使用常见问题之把云效上的配置发到Nacos上面去如何解决
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
23天前
|
SpringCloudAlibaba Java Nacos
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
|
2月前
|
关系型数据库 MySQL Nacos
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
51 1
|
2月前
|
Nacos
nacos手动创建配置命名空间隔离
nacos手动创建配置命名空间隔离
21 1
|
2月前
|
编解码 Java Nacos
nacos常见问题之密码加密配置如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
151 0
|
2月前
|
安全 前端开发 Nacos
nacos常见问题之配置注册的白名单如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
104 0
|
2月前
|
缓存 关系型数据库 Nacos
nacos常见问题之服务端不开启鉴权日志一直报403如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
43 2