Nacos 客户端配置中心从浅入深原理及源码剖析(上)

简介: Nacos 客户端配置中心从浅入深原理及源码剖析(上)

前言

在之前文章一文带你从零到一深入透析 @RefreshScope 结合 Nacos 动态刷新源码,提及到了

  1. NacosPropertySourceLocator#locate 加载客户端应用配置信息时,会调用 NacosConfigService#getConfig 方法,去请求服务端从本地或数据库中获取配置
  2. NacosContextRefresher#onApplicationEvent 应用准备就绪后,会去调用 NacosConfigService#addListener 方法,它用于注册监听器,后续发生动态配置信息变更后,它会回调:1、进行环境对象属性信息重新加载,启动 bootstrap 父容器,再调用 getConfig 方法获取最新的配置信息;2、销毁 @RefreshScope 标注的 Bean 对象

注册中心,Nacos 有核心类 NacosNamingService,而在配置中心,Nacos 有核心类 NacosConfigService

在这里,会分析以上两个方法在客户端是如何去处理交互的,以及在客户端比较核心的一个长轮询机制

NacosConfigService 核心类加载过程

Nacos 1.0 中,配置基于 Http 代理模拟长连接方式实现去拉取配置,而在 2.0 版本以后进行了架构改进,调整为了基于 Grpc 协议实现真正的长连接

在 NacosConfigService 构造方法中,实例化了一个 ClientWorker 客户端搬运工核心类,由它来负责长连接,保持与 Nacos 服务端配置的及时同步,具体源码如下:

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
                    final Properties properties) throws NacosException {
  this.configFilterChainManager = configFilterChainManager;
  init(properties);
  // Grpc 客户端配置调用类
  agent = new ConfigRpcTransportClient(properties, serverListManager);
  int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
  // 创建一个定时任务线程池
  ScheduledExecutorService executorService = Executors
    .newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
      Thread t = new Thread(r);
      t.setName("com.alibaba.nacos.client.Worker");
      t.setDaemon(true);
      return t;
    });
  agent.setExecutor(executorService);
  agent.start();
}

它内部通过 ConfigRpcTransportClient 类(它连接类型为 Grpc)作为 agent 代理进行工作,先创建好线程数最少为 2 个定时调度线程池,再启动搬运工

public void start() throws NacosException {
  // 客户端请求鉴权
  securityProxy.login(this.properties);
  // 五分钟进行一次鉴权
  this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,
                                       this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
  // ConfigRpcTransportClient#startInternal
  startInternal();
}

先维护好客户端与服务端之间的鉴权工作,然后每间隔五秒钟进行一次数据的校验比对、同步获取最新配置信息

public void startInternal() {
  executor.schedule(() -> {
    while (!executor.isShutdown() && !executor.isTerminated()) {
      try {
        // 五秒钟定时进行一次拉取
        listenExecutebell.poll(5L, TimeUnit.SECONDS);
        if (executor.isShutdown() || executor.isTerminated()) {
          continue;
        }
        executeConfigListen();
      } catch (Exception e) {
        LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
      }
    }
  }, 0L, TimeUnit.MILLISECONDS);
}

从这里就是 NacosConfigService 类初始化做的准备工作,后续的工作会往 listenExecutebell 阻塞队列中塞入元素,executeConfigListen 配置监听方法才会继续往下处理

NacosConfigService#getConfig 方法

从 Nacos 获取配置信息优先级如下

  1. 从本地故障转移 faillover 文件中获取内容,存在直接通过该方法获取配置信息,该文件不由客户端程序去进行维护,而是由我们开发人员手动为它去添加,避免出现客户端紧急重启且同一时间更改了配置、Nacos 服务端下线情况出现
  2. 向 Nacos 服务端发出 ConfigQueryRequest 请求拉取配置信息,服务端响应 ConfigResponse,获取 context 内容返回
  3. 若服务端返回的不是鉴权错误时,就从本地快照文件中获取配置内容,在客户端同步到配置以后,会往本地快照文件中存储一份数据,当配置发生新的改变时,会对该快照文件进行重写

getConfig 方法的内部由 getConfigInner 方法实现,源码如下:

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
  group = blank2defaultGroup(group);
  ParamUtils.checkKeyParam(dataId, group);
  ConfigResponse cr = new ConfigResponse();
  cr.setDataId(dataId);
  cr.setTenant(tenant);
  cr.setGroup(group);
  // 先从本地故障转移文件获取内容,由客户端程序维护,而是由用户自身去维护的
  // 这是专为特定场景,例如:客户端-紧急重启此时在同一时间改变了配置 或 Nacos 服务器下线时
  String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
  if (content != null) {
    LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}",
                worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
    cr.setContent(content);
    String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
    cr.setEncryptedDataKey(encryptedDataKey);
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
  }
  try {
    // ConfigRpcTransportClient#queryConfig
    ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
    cr.setContent(response.getContent());
    cr.setEncryptedDataKey(response.getEncryptedDataKey());
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
  } catch (NacosException ioe) {
    if (NacosException.NO_RIGHT == ioe.getErrCode()) {
      throw ioe;
    }
    LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                worker.getAgentName(), dataId, group, tenant, ioe.toString());
  }
  // 从本地快照中获取内容
  content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
  if (content != null) {
    LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}",
                worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
  }
  cr.setContent(content);
  // 获取本地加密后的缓存文件,空代表本地不存在文件或抛出异常
  String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant);
  cr.setEncryptedDataKey(encryptedDataKey);
  configFilterChainManager.doFilter(null, cr);
  content = cr.getContent();
  return content;
}

在这里主要分析 ConfigRpcTransportClient#queryConfig 方法是如何去往客户端进行拉取的

  1. 获取到处理该请求任务的 Grpc 客户端调用实例
  2. 通过 Grpc 实例向服务端发起 ConfigQueryRequest 请求,在服务端侧由 ConfigQueryRequestHandler 类从持久化层或本地磁盘文件中获取后进行处理,返回读取到的配置信息
  3. 基于服务端返回的配置信息先存储到本地快照文件中以后再进行返回
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
  throws NacosException {
  ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
  request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
  // 创建一个基于 Grpc 调用的客户端
  RpcClient rpcClient = getOneRunningClient();
  if (notify) {
    CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
    if (cacheData != null) {
      rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
    }
  }
  // 基于创建好的客户端连接向服务端发起请求
  ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
  ConfigResponse configResponse = new ConfigResponse();
  if (response.isSuccess()) {
    // 将服务端响应成功的配置内容存入本地快照中
    LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
    configResponse.setContent(response.getContent());
    String configType;
    if (StringUtils.isNotBlank(response.getContentType())) {
      configType = response.getContentType();
    } else {
      configType = ConfigType.TEXT.getType();
    }
    configResponse.setConfigType(configType);
    String encryptedDataKey = response.getEncryptedDataKey();
    LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
    configResponse.setEncryptedDataKey(encryptedDataKey);
    return configResponse;
    .....
}

NacosConfigService#addListener 方法

该方法会涉及到往 CacheMap 缓存集合新增 CacheData 元素,在 NacosConfigService 核心类加载过程中提到了 executeConfigListen 方法,其实它就会遍历 CacheMap 中元素,挨个进行判别配置是否发生变更,以便来向 CacheData 里的监听器进行回调,让客户端重新拉取最新的配置信息!

由客户端搬运工来执行 ClientWorker#addTenantListeners 方法,源码如下:

// NacosConfigService#addListener -> ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
  throws NacosException {
  group = blank2defaultGroup(group);
  String tenant = agent.getTenant();
  // 若当前 dataId-group-tenant 不存在时,往 CacheMap 缓存中添加元素
  CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
  synchronized (cache) {
    // 塞入监听器组、队列元素
    for (Listener listener : listeners) {
      cache.addListener(listener);
    }
    cache.setSyncWithServer(false);
    // 往 listenExecutebell 阻塞队列中塞入元素
    agent.notifyListenConfig();
  }
}
  1. 往 CacheMap 集合中新增 CacheData 元素
  2. 往 CacheData 实例中添加当前客户端传入的监听器
  3. 往 ConfigRpcTransportClient#listenExecutebell 阻塞队列中塞入元素,此时搬运工开辟的线程就会从队列中取到元素,执行 executeConfigListen 方法

接下来继续分析一下 addCacheDataIfAbsent 方法是如何处理的

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
  CacheData cache = getCache(dataId, group, tenant);
  if (null != cache) {
    return cache;
  }
  String key = GroupKey.getKeyTenant(dataId, group, tenant);
  synchronized (cacheMap) {
    CacheData cacheFromMap = getCache(dataId, group, tenant);
    // 避免在相同 data-id、group 存在竞争,进行双重检查
    if (null != cacheFromMap) {
      cache = cacheFromMap;
      // reset so that server not hang this check
      cache.setInitializing(true);
    } else {
      cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
      // ParamUtil#getPerTaskConfigSize = 3000
      int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
      cache.setTaskId(taskId);
      // fix issue # https://github.com/alibaba/nacos/issues/1317
      // 为了解决旧版本,重复加载配置信息的过程
      if (enableRemoteSyncConfig) {
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        cache.setContent(response.getContent());
      }
    }
    // 更新 CacheMap 值
    Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());
    copy.put(key, cache);
    cacheMap.set(copy);
  }
  LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
  MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
  return cache;
}

若 CacheMap 集合存在此【dataId、group、tenant 作为唯一标识】元素,直接返回,否则创建新的 CacheData 实例,存入到 CacheMap 中


目录
相关文章
|
2天前
|
SpringCloudAlibaba 应用服务中间件 Nacos
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心(下)
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心
13 0
|
2天前
|
JSON SpringCloudAlibaba Java
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心(上)
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心
16 1
|
2天前
|
Nacos
nacos 配置页面的模糊查询
nacos 配置页面的模糊查询
|
2天前
|
机器学习/深度学习 Java Nacos
Nacos 配置中心(2023旧笔记)
Nacos 配置中心(2023旧笔记)
21 0
|
2天前
|
存储 前端开发 Java
第十一章 Spring Cloud Alibaba nacos配置中心
第十一章 Spring Cloud Alibaba nacos配置中心
28 0
|
2天前
|
敏捷开发 API 持续交付
云效产品使用常见问题之把云效上的配置发到Nacos上面去如何解决
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
2天前
|
SpringCloudAlibaba Java Nacos
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
|
2天前
|
关系型数据库 MySQL Nacos
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
61 1
|
2天前
|
Nacos
nacos手动创建配置命名空间隔离
nacos手动创建配置命名空间隔离
25 1
|
2天前
|
编解码 Java Nacos
nacos常见问题之密码加密配置如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
191 0

热门文章

最新文章