Nacos 服务端配置中心从浅入深原理及源码剖析(上)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Nacos 服务端配置中心从浅入深原理及源码剖析(上)

前言

Nacos 客户端配置中心从浅入深原理及源码剖析 文章结合看,提到了客户端会向服务端这一侧发出两个不同类型的请求,分别是:ConfigQueryRequest、ConfigBatchListenRequest

这里会介绍这两个请求在服务端是如何去处理的流程,以及通过 Nacos Dashboard 或 Open API 接口更改了配置以后,在服务端这边到底做了些什么事情,会分析到 DumpService 加载过程以及它去如何去通知客户端配置变更的

服务端处理请求入口

来自客户端发起所有的请求,都有对应的请求处理器进行处理,首先它会在容器刷新前先把所有的 RequestHandler 请求处理器进行注入,以下是通过 Debug 看到的这些处理器

查看 RequestHandlerRegistry#onApplicationEvent 方法,它监听了 ContextRefreshedEvent 事件,所有单例 Bean 加载完以后,会发布此事件,该方法源码如下:

public void onApplicationEvent(ContextRefreshedEvent event) {
  Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
  Collection<RequestHandler> values = beansOfType.values();
  for (RequestHandler requestHandler : values) {
    Class<?> clazz = requestHandler.getClass();
    boolean skip = false;
    while (!clazz.getSuperclass().equals(RequestHandler.class)) {
      if (clazz.getSuperclass().equals(Object.class)) {
        skip = true;
        break;
      }
      clazz = clazz.getSuperclass();
    }
    if (skip) {
      continue;
    }
    try {
      Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
      if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
        TpsControl tpsControl = method.getAnnotation(TpsControl.class);
        String pointName = tpsControl.pointName();
        TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
        tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
      }
    } catch (Exception e) {
      //ignore.
    }
    Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
    // 此处往集合中注册请求处理器
    registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
  }
}
// 通过请求类型取出对应的处理器
public RequestHandler getByRequestType(String requestType) {
  return registryHandlers.get(requestType);
}

最终在代码中会调用到 RequestHandler#handleRequest 此方法,通过钩子函数调用子类的 handle 方法,当前类 RequestHandler#handle 方法是空实现!

public Response handleRequest(T request, RequestMeta meta) throws NacosException {
  for (AbstractRequestFilter filter : requestFilters.filters) {
    try {
      Response filterResult = filter.filter(request, meta, this.getClass());
      if (filterResult != null && !filterResult.isSuccess()) {
        return filterResult;
      }
    } catch (Throwable throwable) {
      Loggers.REMOTE.error("filter error", throwable);
    }
  }
  return handle(request, meta);
}

该小节分析的是请求处理器如何进来的,主要是要想清楚,它这么多的不同请求类型处理器,想必它最终的入口就只有一个吧,这样看来,入口就在它们的共同父类:RequestHandler

ConfigQueryRequestHandler#handle 方法

通过该类来处理客户端发出的 ConfigQueryRequest 请求,寓意:获取配置信息的请求,源码中该处理器分为以下几步进行处理:

  1. 优先从 ConfigCacheService#CACHE 缓存集合中获取 CacheItem 数据的 md5 值,Nacos 服务端一启动会往该缓存中塞入数据,以及在更新配置以后会更新对应元素的数据信息
  2. 若对当前的配置文件没有打标签或灰度标识的话,默认会优先从本地缓存中读取数据,避免对数据库造成读取压力
  3. 最后组装数据成 ConfigQueryResponse 响应体返回给客户端

摘取部分核心源码如下:

// 灰度发布
if (isBeta) {
  // .....
} else {
  if (StringUtils.isBlank(tag)) {
    // 打标签
    if (isUseTag(cacheItem, autoTag)) {
      // ........
    } else {
      md5 = cacheItem.getMd5();
      lastModified = cacheItem.getLastModifiedTs();
      // 若直接读取,则从持久层获取数据
      if (PropertyUtil.isDirectRead()) {
        configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
      } else {
        // 从本地缓存文件中进行读取
        file = DiskUtil.targetFile(dataId, group, tenant);
      }
      if (configInfoBase == null && fileNotExist(file)) {
        // FIXME CacheItem
        // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.
        ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
                                        ConfigTraceService.PULL_EVENT_NOTFOUND, -1, clientIp, false);
        // 配置未存在
        response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");
        return response;
      }
    }
  }
}
response.setMd5(md5);
// 
if (PropertyUtil.isDirectRead()) {
  response.setLastModified(lastModified);
  response.setContent(configInfoBase.getContent());
  response.setEncryptedDataKey(configInfoBase.getEncryptedDataKey());
  response.setResultCode(ResponseCode.SUCCESS.getCode());
} else {
  // 将文件读取为字符进行返回
  String content = null;
  try {
    content = readFileContent(file);
    response.setContent(content);
    response.setLastModified(lastModified);
    response.setResultCode(ResponseCode.SUCCESS.getCode());
    if (isBeta) {
      response.setEncryptedDataKey(cacheItem.getEncryptedDataKeyBeta());
    } else {
      response.setEncryptedDataKey(cacheItem.getEncryptedDataKey());
    }
  } catch (IOException e) {
    response.setErrorInfo(ResponseCode.FAIL.getCode(), e.getMessage());
    return response;
  }
}

PropertyUtil#isDirectRead 用于判定是否直接读取,它从一定程度上减少对数据库或 Leader 节点造成压力,如下:

/**
 * 决定是否直接读取数据,若配置是 MySQL 持久化,会返回 false
 * 若使用 MySQL,减少数据库读取压力
 * 若使用 Raft、Derby,减少领导节点读取压力
 * @return 是否直接通过持久化方式读取
 */
public static boolean isDirectRead() {
  return EnvUtil.getStandaloneMode() && isEmbeddedStorage();
}

ConfigBatchListenRequest#handle 方法

客户端应用启动就绪后会向 Nacos Client 调用 addListener 方法新增监听器,当第一个五秒执行 executeConfigListen 方法时,会组装所有的 listener 向 Nacos Server 端发起 ConfigBatchListenRequest 请求

随即服务端就从处理器注册表:registryHandlers 中,取出该请求类型的对应处理器 ConfigChangeBatchListenRequestHandler#handle 方法处理,处理过程如下

  1. 遍历 Nacos Client 传入的 List<ConfigListenContext> 集合,根据 dataId、group、tenant 组合生成 groupKey
  2. 判断其 listen 状态,若为 true 新增元素以 groupKey 作为 key、connectionId 作为 value,connectionId 作为 key、groupKey 作为 value;若为 false 移除 groupKey 集合、connectionId 集合
  3. 若 listen 为 true 情况下,还需要作 md5 值比对,用服务端 CacheItem 元素的 md5 值与客户端传入的 md5 作比对,若不一致的话,将当前 dataId、group、tenant 值返回,Nacos Client 接收数据以后就会通知唤醒客户端的监听器.
public ConfigChangeBatchListenResponse handle(ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta)
  throws NacosException {
  String connectionId = StringPool.get(meta.getConnectionId());
  String tag = configChangeListenRequest.getHeader(Constants.VIPSERVER_TAG);
  ConfigChangeBatchListenResponse configChangeBatchListenResponse = new ConfigChangeBatchListenResponse();
  for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest.getConfigListenContexts()) {
    String groupKey = GroupKey2.getKey(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
    groupKey = StringPool.get(groupKey);
    String md5 = StringPool.get(listenContext.getMd5());
    if (configChangeListenRequest.isListen()) {
      // 新增 groupKeyContext、connectionIdContext 集合元素
      configChangeListenContext.addListen(groupKey, md5, connectionId);
      // 用 CacheItem 元素 md5 值与客户端值作对比,若不一致,会返回当前的 dataId 数据
      boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
      if (!isUptoDate) {
        configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(),
                                                        listenContext.getTenant());
      }
    } else {
      // 移除 groupKeyContext、connectionIdContext 集合元素
      configChangeListenContext.removeListen(groupKey, connectionId);
    }
  }
  return configChangeBatchListenResponse;
}

一个 groupKey 对应多个 connectionId,原因:一个配置文件可能会在多个客户端中同时去使用

一个 connectionId 对应多个 groupKey,原因:一个客户端不仅仅是只会使用一个配置文件

DumpService 加载过程

DumpService 是 Nacos Server 作为配置中心比较核心的一个类,用它去负责保持配置数据是最新状态,同时由它去负责与 Nacos Client 之间的心跳;由于后续的控制台更新配置也会使用到此类,在这里先介绍该类的初始化以及它所做的一些事情!

DumpService 类初始化

由于它是抽象类,所以它的初始化过程必然会交由子类来进行实现,又因为我们必然会使用 MySQL 进行配置,实现数据可靠性持久化,所以直接从它的子类ExternalDumpService 进行分析

ExternalDumpService vs EmbeddedDumpService

1、当前是单机模式下,采用的是内嵌的小数据库 Derby 存储若;当前是集群模式下,采用的是持久层可靠的 MySQL 数据库存储

2、内嵌数据库进行存储,重启后所有数据都会消失;而通过 MySQL 存储,下次启动数据仍然存在

3、它们都继承至 DumpService,但它们在容器中通过 Conditional 来约束只会存在其中一个实例,由于在大部分场景下,要支持高可用及数据可靠性,一般都会采用 ExternalDumpService 进行实现

4、它们的持久服务都对应不同的实现,ExternalDumpService 对应的 PersistService 实现为 ExternalStoragePersistServiceImpl,而 EmbeddedDumpService 对应的 PersistService 实现为 EmbeddedStoragePersistServiceImpl

先从 ExternalDumpService 构造方法开始介绍,它默认实现会调用父类 DumpService 构造方法,源码如下:

public ExternalDumpService(PersistService persistService, ServerMemberManager memberManager) {
  super(persistService, memberManager);
}
public DumpService(PersistService persistService, ServerMemberManager memberManager) {
  this.persistService = persistService;
  this.memberManager = memberManager;
  this.processor = new DumpProcessor(this);
  this.dumpAllProcessor = new DumpAllProcessor(this);
  this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
  this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
  // dumpTaskMgr->DumpTaskManager 任务管理器组合 DumpProcessor
  this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
  this.dumpTaskMgr.setDefaultTaskProcessor(processor);
  // dumpAllTaskMgr->DumpAllTaskManager 任务管理器组合 DumpAllProcessor
  this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
  this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
  // taskId=dumpAllConfigTask 由 DumpAllProcessor 进行处理
  this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
  // 区分 beta、tag、普通,分别会创建三个线程去处理.
  this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
  this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
  // 初始化数据源对象
  DynamicDataSource.getInstance().getDataSource();
}

ExternalDumpService#init 方法由 @PostConstruct 注解修饰,那么它就会该类初始化阶段进行调用,而它的 init 方法又是直接调用父类 DumpService#dumpOperate 方法,源码如下:

protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
            DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
  String dumpFileContext = "CONFIG_DUMP_TO_FILE";
  TimerContext.start(dumpFileContext);
  try {
    LogUtil.DEFAULT_LOG.warn("DumpService start");
    // 新增全量拉取的任务
    Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
    // beata、tag 区分开来
    Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
    Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
    // 移除 1000 条超过 30 天数据的历史配置信息
    Runnable clearConfigHistory = () -> {
      LOGGER.warn("clearConfigHistory start");
      if (canExecute()) {
        try {
          Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
          int pageSize = 1000;
          LOGGER.warn("clearConfigHistory, getBeforeStamp:{}, pageSize:{}", startTime, pageSize);
          persistService.removeConfigHistory(startTime, pageSize);
        } catch (Throwable e) {
          LOGGER.error("clearConfigHistory error : {}", e.toString());
        }
      }
    };
    try {
      // 一启动默认先拉取全量数据存入本地缓存中,存储 CacheItem 元素的集合
      dumpConfigInfo(dumpAllProcessor);
      // update Beta cache
      LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
      DiskUtil.clearAllBeta();
      if (persistService.isExistTable(BETA_TABLE_NAME)) {
        dumpAllBetaProcessor.process(new DumpAllBetaTask());
      }
      // update Tag cache
      LogUtil.DEFAULT_LOG.info("start clear all config-info-tag.");
      DiskUtil.clearAllTag();
      if (persistService.isExistTable(TAG_TABLE_NAME)) {
        dumpAllTagProcessor.process(new DumpAllTagTask());
      }
      // add to dump aggr
      List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
      if (configList != null && !configList.isEmpty()) {
        total = configList.size();
        List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
        for (List<ConfigInfoChanged> list : splitList) {
          MergeAllDataWorker work = new MergeAllDataWorker(list);
          work.start();
        }
        LOGGER.info("server start, schedule merge end.");
      }
    } catch (Exception e) {
      LogUtil.FATAL_LOG
        .error("Nacos Server did not start because dumpservice bean construction failure :\n" + e);
      throw new NacosException(NacosException.SERVER_ERROR,
                               "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
                               e);
    }
    // 集群模式下
    if (!EnvUtil.getStandaloneMode()) {{
      Runnable heartbeat = () -> {
        String heartBeatTime = TimeUtils.getCurrentTime().toString();
        // 将心热时间写入到磁盘中,避免下次 Nacos 集群节点启动时,未超过 6 小时还去进行一次全量数据拉取,具体逻辑在 dumpConfigInfo 方法体现
        try {
          DiskUtil.saveHeartBeatToDisk(heartBeatTime);
        } catch (IOException e) {
          LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
        }
      };
      // 该任务 10 秒执行一次
      ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
      long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
      LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
      // 6*60 分钟也就是 6 小时执行一次全量数据拉取的任务
      ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
      ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
      ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
      // 十分钟执行一次任务:移除 1000 条超过 30 天数据的历史配置信息
      ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
    }
  } finally {
    TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
  }
}

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6天前
|
存储 网络协议 Nacos
高效搭建Nacos:实现微服务的服务注册与配置中心
Nacos(Dynamic Naming and Configuration Service)是阿里巴巴开源的一款动态服务发现、配置管理和服务管理平台。它旨在帮助开发者更轻松地构建、部署和管理分布式系统,特别是在微服务架构中。
162 81
高效搭建Nacos:实现微服务的服务注册与配置中心
|
23天前
|
JSON Java Nacos
SpringCloud 应用 Nacos 配置中心注解
在 Spring Cloud 应用中可以非常低成本地集成 Nacos 实现配置动态刷新,在应用程序代码中通过 Spring 官方的注解 @Value 和 @ConfigurationProperties,引用 Spring enviroment 上下文中的属性值,这种用法的最大优点是无代码层面侵入性,但也存在诸多限制,为了解决问题,提升应用接入 Nacos 配置中心的易用性,Spring Cloud Alibaba 发布一套全新的 Nacos 配置中心的注解。
140 10
|
2月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评。然而,“客户端不发送心跳检测”是使用中常见的问题之一。本文详细探讨了该问题的原因及解决方法,包括检查客户端配置、网络连接、日志、版本兼容性、心跳检测策略、服务实例注册状态、重启应用及环境变量等步骤,旨在帮助开发者快速定位并解决问题,确保服务正常运行。
51 5
|
2月前
|
监控 Java 测试技术
Nacos 配置中心变更利器:自定义标签灰度
本文是对 MSE Nacos 应用自定义标签灰度的功能介绍,欢迎大家升级版本进行试用。
169 10
|
2月前
|
网络安全 Nacos 开发者
Nacos作为流行的微服务注册与配置中心,“节点提示暂时不可用”是常见的问题之一
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。然而,“节点提示暂时不可用”是常见的问题之一。本文将探讨该问题的原因及解决方案,帮助开发者快速定位并解决问题,确保服务的正常运行。通过检查服务实例状态、网络连接、Nacos配置、调整健康检查策略等步骤,可以有效解决这一问题。
41 4
|
6月前
|
Java Nacos 数据库
使用 nacos 搭建注册中心及配置中心
使用 nacos 搭建注册中心及配置中心
110 5
|
6月前
|
NoSQL Java Nacos
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
189 3
|
2月前
|
负载均衡 应用服务中间件 Nacos
Nacos配置中心
Nacos配置中心
122 1
Nacos配置中心
|
2月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。然而,实际使用中常遇到“客户端不发送心跳检测”的问题。本文深入探讨该问题的原因及解决方案,帮助开发者快速定位并解决问题,确保服务正常运行。通过检查客户端配置、网络连接、日志、版本兼容性、心跳策略、注册状态、重启应用和环境变量等步骤,系统地排查和解决这一问题。
56 3
|
2月前
|
安全 Nacos 数据库
Nacos是一款流行的微服务注册与配置中心,但直接暴露在公网中可能导致非法访问和数据库篡改
Nacos是一款流行的微服务注册与配置中心,但直接暴露在公网中可能导致非法访问和数据库篡改。本文详细探讨了这一问题的原因及解决方案,包括限制公网访问、使用HTTPS、强化数据库安全、启用访问控制、监控和审计等步骤,帮助开发者确保服务的安全运行。
72 3