Nacos 客户端配置中心从浅入深原理及源码剖析(上)

简介: Nacos 客户端配置中心从浅入深原理及源码剖析(上)

前言

在之前文章一文带你从零到一深入透析 @RefreshScope 结合 Nacos 动态刷新源码,提及到了

  1. NacosPropertySourceLocator#locate 加载客户端应用配置信息时,会调用 NacosConfigService#getConfig 方法,去请求服务端从本地或数据库中获取配置
  2. NacosContextRefresher#onApplicationEvent 应用准备就绪后,会去调用 NacosConfigService#addListener 方法,它用于注册监听器,后续发生动态配置信息变更后,它会回调:1、进行环境对象属性信息重新加载,启动 bootstrap 父容器,再调用 getConfig 方法获取最新的配置信息;2、销毁 @RefreshScope 标注的 Bean 对象

注册中心,Nacos 有核心类 NacosNamingService,而在配置中心,Nacos 有核心类 NacosConfigService

在这里,会分析以上两个方法在客户端是如何去处理交互的,以及在客户端比较核心的一个长轮询机制

NacosConfigService 核心类加载过程

Nacos 1.0 中,配置基于 Http 代理模拟长连接方式实现去拉取配置,而在 2.0 版本以后进行了架构改进,调整为了基于 Grpc 协议实现真正的长连接

在 NacosConfigService 构造方法中,实例化了一个 ClientWorker 客户端搬运工核心类,由它来负责长连接,保持与 Nacos 服务端配置的及时同步,具体源码如下:

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
                    final Properties properties) throws NacosException {
  this.configFilterChainManager = configFilterChainManager;
  init(properties);
  // Grpc 客户端配置调用类
  agent = new ConfigRpcTransportClient(properties, serverListManager);
  int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
  // 创建一个定时任务线程池
  ScheduledExecutorService executorService = Executors
    .newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
      Thread t = new Thread(r);
      t.setName("com.alibaba.nacos.client.Worker");
      t.setDaemon(true);
      return t;
    });
  agent.setExecutor(executorService);
  agent.start();
}

它内部通过 ConfigRpcTransportClient 类(它连接类型为 Grpc)作为 agent 代理进行工作,先创建好线程数最少为 2 个定时调度线程池,再启动搬运工

public void start() throws NacosException {
  // 客户端请求鉴权
  securityProxy.login(this.properties);
  // 五分钟进行一次鉴权
  this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,
                                       this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
  // ConfigRpcTransportClient#startInternal
  startInternal();
}

先维护好客户端与服务端之间的鉴权工作,然后每间隔五秒钟进行一次数据的校验比对、同步获取最新配置信息

public void startInternal() {
  executor.schedule(() -> {
    while (!executor.isShutdown() && !executor.isTerminated()) {
      try {
        // 五秒钟定时进行一次拉取
        listenExecutebell.poll(5L, TimeUnit.SECONDS);
        if (executor.isShutdown() || executor.isTerminated()) {
          continue;
        }
        executeConfigListen();
      } catch (Exception e) {
        LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
      }
    }
  }, 0L, TimeUnit.MILLISECONDS);
}

从这里就是 NacosConfigService 类初始化做的准备工作,后续的工作会往 listenExecutebell 阻塞队列中塞入元素,executeConfigListen 配置监听方法才会继续往下处理

NacosConfigService#getConfig 方法

从 Nacos 获取配置信息优先级如下

  1. 从本地故障转移 faillover 文件中获取内容,存在直接通过该方法获取配置信息,该文件不由客户端程序去进行维护,而是由我们开发人员手动为它去添加,避免出现客户端紧急重启且同一时间更改了配置、Nacos 服务端下线情况出现
  2. 向 Nacos 服务端发出 ConfigQueryRequest 请求拉取配置信息,服务端响应 ConfigResponse,获取 context 内容返回
  3. 若服务端返回的不是鉴权错误时,就从本地快照文件中获取配置内容,在客户端同步到配置以后,会往本地快照文件中存储一份数据,当配置发生新的改变时,会对该快照文件进行重写

getConfig 方法的内部由 getConfigInner 方法实现,源码如下:

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
  group = blank2defaultGroup(group);
  ParamUtils.checkKeyParam(dataId, group);
  ConfigResponse cr = new ConfigResponse();
  cr.setDataId(dataId);
  cr.setTenant(tenant);
  cr.setGroup(group);
  // 先从本地故障转移文件获取内容,由客户端程序维护,而是由用户自身去维护的
  // 这是专为特定场景,例如:客户端-紧急重启此时在同一时间改变了配置 或 Nacos 服务器下线时
  String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
  if (content != null) {
    LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}",
                worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
    cr.setContent(content);
    String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
    cr.setEncryptedDataKey(encryptedDataKey);
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
  }
  try {
    // ConfigRpcTransportClient#queryConfig
    ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
    cr.setContent(response.getContent());
    cr.setEncryptedDataKey(response.getEncryptedDataKey());
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
  } catch (NacosException ioe) {
    if (NacosException.NO_RIGHT == ioe.getErrCode()) {
      throw ioe;
    }
    LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                worker.getAgentName(), dataId, group, tenant, ioe.toString());
  }
  // 从本地快照中获取内容
  content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
  if (content != null) {
    LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}",
                worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
  }
  cr.setContent(content);
  // 获取本地加密后的缓存文件,空代表本地不存在文件或抛出异常
  String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant);
  cr.setEncryptedDataKey(encryptedDataKey);
  configFilterChainManager.doFilter(null, cr);
  content = cr.getContent();
  return content;
}

在这里主要分析 ConfigRpcTransportClient#queryConfig 方法是如何去往客户端进行拉取的

  1. 获取到处理该请求任务的 Grpc 客户端调用实例
  2. 通过 Grpc 实例向服务端发起 ConfigQueryRequest 请求,在服务端侧由 ConfigQueryRequestHandler 类从持久化层或本地磁盘文件中获取后进行处理,返回读取到的配置信息
  3. 基于服务端返回的配置信息先存储到本地快照文件中以后再进行返回
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
  throws NacosException {
  ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
  request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
  // 创建一个基于 Grpc 调用的客户端
  RpcClient rpcClient = getOneRunningClient();
  if (notify) {
    CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
    if (cacheData != null) {
      rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
    }
  }
  // 基于创建好的客户端连接向服务端发起请求
  ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
  ConfigResponse configResponse = new ConfigResponse();
  if (response.isSuccess()) {
    // 将服务端响应成功的配置内容存入本地快照中
    LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
    configResponse.setContent(response.getContent());
    String configType;
    if (StringUtils.isNotBlank(response.getContentType())) {
      configType = response.getContentType();
    } else {
      configType = ConfigType.TEXT.getType();
    }
    configResponse.setConfigType(configType);
    String encryptedDataKey = response.getEncryptedDataKey();
    LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
    configResponse.setEncryptedDataKey(encryptedDataKey);
    return configResponse;
    .....
}

NacosConfigService#addListener 方法

该方法会涉及到往 CacheMap 缓存集合新增 CacheData 元素,在 NacosConfigService 核心类加载过程中提到了 executeConfigListen 方法,其实它就会遍历 CacheMap 中元素,挨个进行判别配置是否发生变更,以便来向 CacheData 里的监听器进行回调,让客户端重新拉取最新的配置信息!

由客户端搬运工来执行 ClientWorker#addTenantListeners 方法,源码如下:

// NacosConfigService#addListener -> ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
  throws NacosException {
  group = blank2defaultGroup(group);
  String tenant = agent.getTenant();
  // 若当前 dataId-group-tenant 不存在时,往 CacheMap 缓存中添加元素
  CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
  synchronized (cache) {
    // 塞入监听器组、队列元素
    for (Listener listener : listeners) {
      cache.addListener(listener);
    }
    cache.setSyncWithServer(false);
    // 往 listenExecutebell 阻塞队列中塞入元素
    agent.notifyListenConfig();
  }
}
  1. 往 CacheMap 集合中新增 CacheData 元素
  2. 往 CacheData 实例中添加当前客户端传入的监听器
  3. 往 ConfigRpcTransportClient#listenExecutebell 阻塞队列中塞入元素,此时搬运工开辟的线程就会从队列中取到元素,执行 executeConfigListen 方法

接下来继续分析一下 addCacheDataIfAbsent 方法是如何处理的

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
  CacheData cache = getCache(dataId, group, tenant);
  if (null != cache) {
    return cache;
  }
  String key = GroupKey.getKeyTenant(dataId, group, tenant);
  synchronized (cacheMap) {
    CacheData cacheFromMap = getCache(dataId, group, tenant);
    // 避免在相同 data-id、group 存在竞争,进行双重检查
    if (null != cacheFromMap) {
      cache = cacheFromMap;
      // reset so that server not hang this check
      cache.setInitializing(true);
    } else {
      cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
      // ParamUtil#getPerTaskConfigSize = 3000
      int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
      cache.setTaskId(taskId);
      // fix issue # https://github.com/alibaba/nacos/issues/1317
      // 为了解决旧版本,重复加载配置信息的过程
      if (enableRemoteSyncConfig) {
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        cache.setContent(response.getContent());
      }
    }
    // 更新 CacheMap 值
    Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());
    copy.put(key, cache);
    cacheMap.set(copy);
  }
  LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
  MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
  return cache;
}

若 CacheMap 集合存在此【dataId、group、tenant 作为唯一标识】元素,直接返回,否则创建新的 CacheData 实例,存入到 CacheMap 中


目录
相关文章
|
2月前
|
负载均衡 算法 Java
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
40岁老架构师尼恩分享了关于SpringCloud核心组件的底层原理,特别是针对蚂蚁集团面试中常见的面试题进行了详细解析。内容涵盖了Nacos注册中心的AP/CP模式、Distro和Raft分布式协议、Sentinel的高可用组件、负载均衡组件的实现原理等。尼恩强调了系统化学习的重要性,推荐了《尼恩Java面试宝典PDF》等资料,帮助读者更好地准备面试,提高技术实力,最终实现“offer自由”。更多技术资料和指导,可关注公众号【技术自由圈】获取。
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
|
4月前
|
安全 Nacos 数据安全/隐私保护
【技术干货】破解Nacos安全隐患:连接用户名与密码明文传输!掌握HTTPS、JWT与OAuth2.0加密秘籍,打造坚不可摧的微服务注册与配置中心!从原理到实践,全方位解析如何构建安全防护体系,让您从此告别数据泄露风险!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心,但其连接用户名和密码的明文传输成为安全隐患。本文探讨加密策略提升安全性。首先介绍明文传输风险,随后对比三种加密方案:HTTPS简化数据保护;JWT令牌减少凭证传输,适配分布式环境;OAuth2.0增强安全,支持多授权模式。每种方案各有千秋,开发者需根据具体需求选择最佳实践,确保服务安全稳定运行。
344 0
|
4月前
|
Java Nacos 开发工具
【Nacos】心跳断了怎么办?!8步排查法+实战代码,手把手教你解决Nacos客户端不发送心跳检测问题,让服务瞬间恢复活力!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心。然而,“客户端不发送心跳检测”的问题时有发生,可能导致服务实例被视为离线。本文介绍如何排查此类问题:确认Nacos服务器地址配置正确;检查网络连通性;查看客户端日志;确保Nacos SDK版本兼容;调整心跳检测策略;验证服务实例注册状态;必要时重启应用;检查影响行为的环境变量。通过这些步骤,通常可定位并解决问题,保障服务稳定运行。
248 0
|
4月前
|
关系型数据库 MySQL Java
“惊呆了!无需改动Nacos源码,轻松实现SGJDBC连接MySQL?这操作太秀了,速来围观,错过等哭!”
【8月更文挑战第7天】在使用Nacos进行服务治理时,常需连接MySQL存储数据。使用特定的SGJDBC驱动连接MySQL时,一般无需修改Nacos源码。需确保SGJDBC已添加至类路径,并在Nacos配置文件中指定使用SGJDBC的JDBC URL。示例中展示如何配置Nacos使用MySQL及SGJDBC,并在应用中通过Nacos API获取配置信息建立数据库连接,实现灵活集成不同JDBC驱动的目标。
119 0
|
5月前
|
网络安全 Nacos
Nacos客户端配置错误检查
Nacos客户端配置错误检查
189 3
|
6月前
|
缓存 监控 Java
深入解析Nacos配置中心的动态配置更新技术
深入解析Nacos配置中心的动态配置更新技术
|
5月前
|
Kubernetes 监控 Java
有了k8s还需要gateway网关,nacos配置中心吗
在Kubernetes环境中,服务网关(如Spring Cloud Gateway)和Nacos配置中心补充了k8s的不足。Nacos提供灵活服务路由和动态配置更新,超越k8s基础服务发现。它还支持更复杂的配置管理和实时推送,以及环境隔离和版本控制。作为服务注册中心,Nacos增强k8s服务治理能力,保持技术一致性,并提供额外的安全层及监控功能。
263 0
|
5月前
|
缓存 网络安全 Nacos
登录nacos客户端提示no message available
登录nacos客户端提示no message available
|
23天前
|
负载均衡 应用服务中间件 Nacos
Nacos配置中心
Nacos配置中心
52 1
Nacos配置中心
|
19天前
|
监控 Java 测试技术
Nacos 配置中心变更利器:自定义标签灰度
本文是对 MSE Nacos 应用自定义标签灰度的功能介绍,欢迎大家升级版本进行试用。