Nacos 客户端配置中心从浅入深原理及源码剖析(上)

简介: Nacos 客户端配置中心从浅入深原理及源码剖析(上)

前言

在之前文章一文带你从零到一深入透析 @RefreshScope 结合 Nacos 动态刷新源码,提及到了

  1. NacosPropertySourceLocator#locate 加载客户端应用配置信息时,会调用 NacosConfigService#getConfig 方法,去请求服务端从本地或数据库中获取配置
  2. NacosContextRefresher#onApplicationEvent 应用准备就绪后,会去调用 NacosConfigService#addListener 方法,它用于注册监听器,后续发生动态配置信息变更后,它会回调:1、进行环境对象属性信息重新加载,启动 bootstrap 父容器,再调用 getConfig 方法获取最新的配置信息;2、销毁 @RefreshScope 标注的 Bean 对象

注册中心,Nacos 有核心类 NacosNamingService,而在配置中心,Nacos 有核心类 NacosConfigService

在这里,会分析以上两个方法在客户端是如何去处理交互的,以及在客户端比较核心的一个长轮询机制

NacosConfigService 核心类加载过程

Nacos 1.0 中,配置基于 Http 代理模拟长连接方式实现去拉取配置,而在 2.0 版本以后进行了架构改进,调整为了基于 Grpc 协议实现真正的长连接

在 NacosConfigService 构造方法中,实例化了一个 ClientWorker 客户端搬运工核心类,由它来负责长连接,保持与 Nacos 服务端配置的及时同步,具体源码如下:

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
                    final Properties properties) throws NacosException {
  this.configFilterChainManager = configFilterChainManager;
  init(properties);
  // Grpc 客户端配置调用类
  agent = new ConfigRpcTransportClient(properties, serverListManager);
  int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
  // 创建一个定时任务线程池
  ScheduledExecutorService executorService = Executors
    .newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
      Thread t = new Thread(r);
      t.setName("com.alibaba.nacos.client.Worker");
      t.setDaemon(true);
      return t;
    });
  agent.setExecutor(executorService);
  agent.start();
}

它内部通过 ConfigRpcTransportClient 类(它连接类型为 Grpc)作为 agent 代理进行工作,先创建好线程数最少为 2 个定时调度线程池,再启动搬运工

public void start() throws NacosException {
  // 客户端请求鉴权
  securityProxy.login(this.properties);
  // 五分钟进行一次鉴权
  this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,
                                       this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
  // ConfigRpcTransportClient#startInternal
  startInternal();
}

先维护好客户端与服务端之间的鉴权工作,然后每间隔五秒钟进行一次数据的校验比对、同步获取最新配置信息

public void startInternal() {
  executor.schedule(() -> {
    while (!executor.isShutdown() && !executor.isTerminated()) {
      try {
        // 五秒钟定时进行一次拉取
        listenExecutebell.poll(5L, TimeUnit.SECONDS);
        if (executor.isShutdown() || executor.isTerminated()) {
          continue;
        }
        executeConfigListen();
      } catch (Exception e) {
        LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
      }
    }
  }, 0L, TimeUnit.MILLISECONDS);
}

从这里就是 NacosConfigService 类初始化做的准备工作,后续的工作会往 listenExecutebell 阻塞队列中塞入元素,executeConfigListen 配置监听方法才会继续往下处理

NacosConfigService#getConfig 方法

从 Nacos 获取配置信息优先级如下

  1. 从本地故障转移 faillover 文件中获取内容,存在直接通过该方法获取配置信息,该文件不由客户端程序去进行维护,而是由我们开发人员手动为它去添加,避免出现客户端紧急重启且同一时间更改了配置、Nacos 服务端下线情况出现
  2. 向 Nacos 服务端发出 ConfigQueryRequest 请求拉取配置信息,服务端响应 ConfigResponse,获取 context 内容返回
  3. 若服务端返回的不是鉴权错误时,就从本地快照文件中获取配置内容,在客户端同步到配置以后,会往本地快照文件中存储一份数据,当配置发生新的改变时,会对该快照文件进行重写

getConfig 方法的内部由 getConfigInner 方法实现,源码如下:

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
  group = blank2defaultGroup(group);
  ParamUtils.checkKeyParam(dataId, group);
  ConfigResponse cr = new ConfigResponse();
  cr.setDataId(dataId);
  cr.setTenant(tenant);
  cr.setGroup(group);
  // 先从本地故障转移文件获取内容,由客户端程序维护,而是由用户自身去维护的
  // 这是专为特定场景,例如:客户端-紧急重启此时在同一时间改变了配置 或 Nacos 服务器下线时
  String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
  if (content != null) {
    LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}",
                worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
    cr.setContent(content);
    String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
    cr.setEncryptedDataKey(encryptedDataKey);
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
  }
  try {
    // ConfigRpcTransportClient#queryConfig
    ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
    cr.setContent(response.getContent());
    cr.setEncryptedDataKey(response.getEncryptedDataKey());
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
  } catch (NacosException ioe) {
    if (NacosException.NO_RIGHT == ioe.getErrCode()) {
      throw ioe;
    }
    LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                worker.getAgentName(), dataId, group, tenant, ioe.toString());
  }
  // 从本地快照中获取内容
  content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
  if (content != null) {
    LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}",
                worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
  }
  cr.setContent(content);
  // 获取本地加密后的缓存文件,空代表本地不存在文件或抛出异常
  String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant);
  cr.setEncryptedDataKey(encryptedDataKey);
  configFilterChainManager.doFilter(null, cr);
  content = cr.getContent();
  return content;
}

在这里主要分析 ConfigRpcTransportClient#queryConfig 方法是如何去往客户端进行拉取的

  1. 获取到处理该请求任务的 Grpc 客户端调用实例
  2. 通过 Grpc 实例向服务端发起 ConfigQueryRequest 请求,在服务端侧由 ConfigQueryRequestHandler 类从持久化层或本地磁盘文件中获取后进行处理,返回读取到的配置信息
  3. 基于服务端返回的配置信息先存储到本地快照文件中以后再进行返回
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
  throws NacosException {
  ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
  request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
  // 创建一个基于 Grpc 调用的客户端
  RpcClient rpcClient = getOneRunningClient();
  if (notify) {
    CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
    if (cacheData != null) {
      rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
    }
  }
  // 基于创建好的客户端连接向服务端发起请求
  ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
  ConfigResponse configResponse = new ConfigResponse();
  if (response.isSuccess()) {
    // 将服务端响应成功的配置内容存入本地快照中
    LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
    configResponse.setContent(response.getContent());
    String configType;
    if (StringUtils.isNotBlank(response.getContentType())) {
      configType = response.getContentType();
    } else {
      configType = ConfigType.TEXT.getType();
    }
    configResponse.setConfigType(configType);
    String encryptedDataKey = response.getEncryptedDataKey();
    LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
    configResponse.setEncryptedDataKey(encryptedDataKey);
    return configResponse;
    .....
}

NacosConfigService#addListener 方法

该方法会涉及到往 CacheMap 缓存集合新增 CacheData 元素,在 NacosConfigService 核心类加载过程中提到了 executeConfigListen 方法,其实它就会遍历 CacheMap 中元素,挨个进行判别配置是否发生变更,以便来向 CacheData 里的监听器进行回调,让客户端重新拉取最新的配置信息!

由客户端搬运工来执行 ClientWorker#addTenantListeners 方法,源码如下:

// NacosConfigService#addListener -> ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
  throws NacosException {
  group = blank2defaultGroup(group);
  String tenant = agent.getTenant();
  // 若当前 dataId-group-tenant 不存在时,往 CacheMap 缓存中添加元素
  CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
  synchronized (cache) {
    // 塞入监听器组、队列元素
    for (Listener listener : listeners) {
      cache.addListener(listener);
    }
    cache.setSyncWithServer(false);
    // 往 listenExecutebell 阻塞队列中塞入元素
    agent.notifyListenConfig();
  }
}
  1. 往 CacheMap 集合中新增 CacheData 元素
  2. 往 CacheData 实例中添加当前客户端传入的监听器
  3. 往 ConfigRpcTransportClient#listenExecutebell 阻塞队列中塞入元素,此时搬运工开辟的线程就会从队列中取到元素,执行 executeConfigListen 方法

接下来继续分析一下 addCacheDataIfAbsent 方法是如何处理的

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
  CacheData cache = getCache(dataId, group, tenant);
  if (null != cache) {
    return cache;
  }
  String key = GroupKey.getKeyTenant(dataId, group, tenant);
  synchronized (cacheMap) {
    CacheData cacheFromMap = getCache(dataId, group, tenant);
    // 避免在相同 data-id、group 存在竞争,进行双重检查
    if (null != cacheFromMap) {
      cache = cacheFromMap;
      // reset so that server not hang this check
      cache.setInitializing(true);
    } else {
      cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
      // ParamUtil#getPerTaskConfigSize = 3000
      int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
      cache.setTaskId(taskId);
      // fix issue # https://github.com/alibaba/nacos/issues/1317
      // 为了解决旧版本,重复加载配置信息的过程
      if (enableRemoteSyncConfig) {
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        cache.setContent(response.getContent());
      }
    }
    // 更新 CacheMap 值
    Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());
    copy.put(key, cache);
    cacheMap.set(copy);
  }
  LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
  MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
  return cache;
}

若 CacheMap 集合存在此【dataId、group、tenant 作为唯一标识】元素,直接返回,否则创建新的 CacheData 实例,存入到 CacheMap 中


目录
相关文章
|
6天前
|
存储 网络协议 Nacos
高效搭建Nacos:实现微服务的服务注册与配置中心
Nacos(Dynamic Naming and Configuration Service)是阿里巴巴开源的一款动态服务发现、配置管理和服务管理平台。它旨在帮助开发者更轻松地构建、部署和管理分布式系统,特别是在微服务架构中。
162 81
高效搭建Nacos:实现微服务的服务注册与配置中心
|
23天前
|
JSON Java Nacos
SpringCloud 应用 Nacos 配置中心注解
在 Spring Cloud 应用中可以非常低成本地集成 Nacos 实现配置动态刷新,在应用程序代码中通过 Spring 官方的注解 @Value 和 @ConfigurationProperties,引用 Spring enviroment 上下文中的属性值,这种用法的最大优点是无代码层面侵入性,但也存在诸多限制,为了解决问题,提升应用接入 Nacos 配置中心的易用性,Spring Cloud Alibaba 发布一套全新的 Nacos 配置中心的注解。
140 10
|
2月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评。然而,“客户端不发送心跳检测”是使用中常见的问题之一。本文详细探讨了该问题的原因及解决方法,包括检查客户端配置、网络连接、日志、版本兼容性、心跳检测策略、服务实例注册状态、重启应用及环境变量等步骤,旨在帮助开发者快速定位并解决问题,确保服务正常运行。
51 5
|
2月前
|
监控 Java 测试技术
Nacos 配置中心变更利器:自定义标签灰度
本文是对 MSE Nacos 应用自定义标签灰度的功能介绍,欢迎大家升级版本进行试用。
169 10
|
2月前
|
网络安全 Nacos 开发者
Nacos作为流行的微服务注册与配置中心,“节点提示暂时不可用”是常见的问题之一
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。然而,“节点提示暂时不可用”是常见的问题之一。本文将探讨该问题的原因及解决方案,帮助开发者快速定位并解决问题,确保服务的正常运行。通过检查服务实例状态、网络连接、Nacos配置、调整健康检查策略等步骤,可以有效解决这一问题。
41 4
|
2月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。然而,实际使用中常遇到“客户端不发送心跳检测”的问题。本文深入探讨该问题的原因及解决方案,帮助开发者快速定位并解决问题,确保服务正常运行。通过检查客户端配置、网络连接、日志、版本兼容性、心跳策略、注册状态、重启应用和环境变量等步骤,系统地排查和解决这一问题。
56 3
|
2月前
|
安全 Nacos 数据库
Nacos是一款流行的微服务注册与配置中心,但直接暴露在公网中可能导致非法访问和数据库篡改
Nacos是一款流行的微服务注册与配置中心,但直接暴露在公网中可能导致非法访问和数据库篡改。本文详细探讨了这一问题的原因及解决方案,包括限制公网访问、使用HTTPS、强化数据库安全、启用访问控制、监控和审计等步骤,帮助开发者确保服务的安全运行。
72 3
|
6月前
|
Java Nacos 数据库
使用 nacos 搭建注册中心及配置中心
使用 nacos 搭建注册中心及配置中心
110 5
|
6月前
|
NoSQL Java Nacos
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
189 3
|
2月前
|
负载均衡 应用服务中间件 Nacos
Nacos配置中心
Nacos配置中心
122 1
Nacos配置中心