Nacos 客户端配置中心从浅入深原理及源码剖析(下)

简介: Nacos 客户端配置中心从浅入深原理及源码剖析(下)

ConfigRpcTransportClient#executeConfigListen 核心方法

执行该方法的准备工作都有了,CacheMap 存在元素,阻塞队列中能获取到元素,就会执行 executeConfigListen 方法,对监听的配置信息进行检查,来保证客户端的配置信息是最新的!

该方法核心处理的工作分为以下几步

  1. isSyncWithServer = true 监听器进行一次预检查,代表它已经向服务端同步一次了,但它默认值一般就是 false,首次进来都需要向服务端进行一次同步配置操作
  2. 遍历 CacheMap 集合中的元素,若 CacheData 中的 listener 不为空,存入到 listenCachesMap 集合,否则存入到 removeListenCachesMap 集合中
  3. 遍历 listenCachesMap 集合元素,组装生成 takId 的 RpcClient,向服务端发起 ConfigBatchListenRequest 请求,会做以下几件事情
  • 客户端传入当前配置文件的 md5 值、属性:listen=true
  • 服务端会将传入的监听器-key->groupKey、value->connectionId,key->connectionId、value->groupKey 关系进行绑定
  • 若 dataId-group-tenant 组合配置的 md5 值发生了改变,就把当前更改的数据进行返回
  • 客户端此时对返回的配置文件数据进行再次 md5 比对,若不一致的话就会执行监听器的回调方法
  • 修改当前的 CacheData 数据属性,lastModifiedTs=当前时间戳、isSyncWithServer=true
  1. 遍历 removeListenCachesMap 集合,组装生成 takId 的 RpcClient,向服务端发起 ConfigBatchListenRequest 请求,做以下几件事情
  • 客户端传入当前配置文件的 md5 值、属性:listen=false
  • 服务端会将传入的监听器-key->groupKey、value->connectionId,key->connectionId、value->groupKey 关系进行移除
  • 最后,从 CacheMap 集合中移除当前 CacheData 元素
  1. 判别 needAllSync 是否为 true(代表全量同步,五分钟会执行一次全量同步操作),若为 true,更新 lastAllSyncTime 最后同步时间为当前时间
  2. 若配置发生了变化,就往 listenExecutebell 阻塞队列中塞入元素
public void executeConfigListen() {
  Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
  Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);
  long now = System.currentTimeMillis();
  // 超出五分钟以后,进行全量配置信息的同步
  boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
  for (CacheData cache : cacheMap.get().values()) {
    synchronized (cache) {
      // isSyncWithServer 在后面会调整为 true,初始值都是 false
      if (cache.isSyncWithServer()) {
        // 检查监听器 md5 值
        cache.checkListenerMd5();
        if (!needAllSync) {
          continue;
        }
      }
      // 监听器不为空,添加到待监听器集合中
      if (!CollectionUtils.isEmpty(cache.getListeners())) {
        // 不使用本地配置,默认值为 false
        if (!cache.isUseLocalConfigInfo()) {
          List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
          if (cacheDatas == null) {
            cacheDatas = new LinkedList<>();
            listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
          }
          cacheDatas.add(cache);
        }
        // 监听器为空,添加到待移除监听器集合中
      } else if (CollectionUtils.isEmpty(cache.getListeners())) {
        if (!cache.isUseLocalConfigInfo()) {
          List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
          if (cacheDatas == null) {
            cacheDatas = new LinkedList<>();
            removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
          }
          cacheDatas.add(cache);
        }
      }
    }
  }
  boolean hasChangedKeys = false;
  // 若监听器集合不为空,进行遍历处理
  if (!listenCachesMap.isEmpty()) {
    for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
      String taskId = entry.getKey();
      Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
      List<CacheData> listenCaches = entry.getValue();
      for (CacheData cacheData : listenCaches) {
        timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
                         cacheData.getLastModifiedTs().longValue());
      }
      ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
      configChangeListenRequest.setListen(true);
      try {
        // 创建当前任务的 RpcClient 实例
        RpcClient rpcClient = ensureRpcClient(taskId);
        // 向服务端发起请求,超时时长 3 秒
        ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
          rpcClient, configChangeListenRequest);
        if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
          Set<String> changeKeys = new HashSet<>();
          // 若有配置发生了改变,检查后通知监听器
          if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
            hasChangedKeys = true;
            for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {
              // 生成改变 Key,添加到集合中
              String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
              changeKeys.add(changeKey);
              boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
              refreshContentAndCheck(changeKey, !isInitializing);
            }
          }
          // 处理配置内容
          for (CacheData cacheData : listenCaches) {
            String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
            if (!changeKeys.contains(groupKey)) {
              //sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
              synchronized (cacheData) {
                if (!cacheData.getListeners().isEmpty()) {
                  // 通过 CAS 调整最后更新时间,避免其他地方同时对这个数据发生了修改
                  Long previousTimesStamp = timestampMap.get(groupKey);
                  if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, System.currentTimeMillis())) {
                    continue;
                  }
                  // isSyncWithServer 修改为 true
                  cacheData.setSyncWithServer(true);
                }
              }
            }
            cacheData.setInitializing(false);
          }
        }
      } catch (Exception e) {
        LOGGER.error("Async listen config change error ", e);
        try {
          Thread.sleep(50L);
        } catch (InterruptedException interruptedException) {
          //ignore
        }
      }
    }
  }
  // 若待移除的监听器集合不为空,进行遍历处理
  if (!removeListenCachesMap.isEmpty()) {
    for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
      String taskId = entry.getKey();
      List<CacheData> removeListenCaches = entry.getValue();
      ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
      configChangeListenRequest.setListen(false);
      try {
        RpcClient rpcClient = ensureRpcClient(taskId);
        // 取消-移除监听器
        boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
        if (removeSuccess) {
          for (CacheData cacheData : removeListenCaches) {
            synchronized (cacheData) {
              if (cacheData.getListeners().isEmpty()) {
                // 移除当前 CacheData
                ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
              }
            }
          }
        }
      } catch (Exception e) {
        LOGGER.error("async remove listen config change error ", e);
      }
      try {
        Thread.sleep(50L);
      } catch (InterruptedException interruptedException) {
        //ignore
      }
    }
  }
  if (needAllSync) {
    lastAllSyncTime = now;
  }
  //If has changed keys,notify re sync md5.
  if (hasChangedKeys) {
    notifyListenConfig();
  }
}

isSyncWithServer 属性比较重要,它在以下几种情况下都会是 false

/**
 * 1、添加监听器默认值是 false,需要检查
 * 2、收到配置改变的通知,设置为 false,需要检查
 * 3、监听器被移除,设置为 false,需要检查
 */
public boolean isSyncWithServer() {
  return isSyncWithServer;

首次进来先要进行一次与服务端的数据比对过程,然后把该标识修改为 true,防止在下一个 5s 对没有更改过的配置再次调用服务端,浪费资源;当在 Nacos 控制台或通过接口方式修改了配置,服务端就会推送给客户端一个状态,也就是会把这个属性再次修改为 false,下一次就会主动去拉取服务端的数据进行 md5 值比对,主动拉取配置的方法源码如下:

private void refreshContentAndCheck(String groupKey, boolean notify) {
  if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
    CacheData cache = cacheMap.get().get(groupKey);
    refreshContentAndCheck(cache, notify);
  }
}
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
  try {
    // 获取服务端该配置文件下的最新信息
    ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
                                              notify);
    cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
    cacheData.setContent(response.getContent());
    if (null != response.getConfigType()) {
      cacheData.setType(response.getConfigType());
    }
    if (notify) {
      LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                  agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
                  ContentUtils.truncateContent(response.getContent()), response.getConfigType());
    }
    // 此处进行最新数据的 md5 值比对
    cacheData.checkListenerMd5();
  } catch (Exception e) {
    LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
                 cacheData.group, cacheData.tenant, e);
  }
}

通过 CacheData#checkListenerMd5 方法去对 md5 值比对,在该方法中会通知 listener 监听器的持有者

void checkListenerMd5() {
  // 遍历监听器,比对 MD5 值,不同的话就调用监听器回调方法告知配置变更了!
  for (ManagerListenerWrap wrap : listeners) {
    if (!md5.equals(wrap.lastCallMd5)) {
      safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
    }
  }
}
// 摘自 safeNotifyListener 方法中的部分源码
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listenerWrap.inNotifying = true;
// 会调用 innerReceive 方法,触发客户端侧回调逻辑
listener.receiveConfigInfo(contentTmp);

从以上方法中就能够知晓监听器的持有者是如何感知到配置变化的

至于服务端是如何感知到客户端这些监听器的存在,它是通过 ConfigBatchListenRequest 请求传递的,主要看的是 listen 这个属性值,若为 true 服务端就会有一个集合将其维护起来,为 false 服务端就会从集合中将其移除

RpcClient 长连接

以上方法提到了会为每一组监听器创建 RpcClient,它由 RpcClient rpcClient = ensureRpcClient(taskId) 方法处理,如下是它的源码:

private RpcClient ensureRpcClient(String taskId) throws NacosException {
  synchronized (ClientWorker.this) {
    Map<String, String> labels = getLabels();
    Map<String, String> newLabels = new HashMap<>(labels);
    newLabels.put("taskId", taskId);
    // 创建一个基于 GRPC 类型的客户端
    RpcClient rpcClient = RpcClientFactory.createClient(uuid + "_config-" + taskId, getConnectionType(), newLabels);
    if (rpcClient.isWaitInitiated()) {
      // 初始化客户端的处理器
      initRpcClientHandler(rpcClient);
      rpcClient.setTenant(getTenant());
      rpcClient.clientAbilities(initAbilities());
      rpcClient.start();
    }
    return rpcClient;
  }
}

主要看的就是 initRpcClientHandler 方法做的事情,源码如下:

private void initRpcClientHandler(final RpcClient rpcClientInner) {
  /*
   * 处理来自服务端发起的请求
   */
  rpcClientInner.registerServerRequestHandler((request) -> {
    // 当前请求为配置更新通知请求,由 Nacos 服务端发起
    if (request instanceof ConfigChangeNotifyRequest) {
      ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
      LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
                  rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),
                  configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
      String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),configChangeNotifyRequest.getTenant());
      CacheData cacheData = cacheMap.get().get(groupKey);
      if (cacheData != null) {
        synchronized (cacheData) {
          cacheData.getLastModifiedTs().set(System.currentTimeMillis());
          // isSyncWithServer 更新这个值为 false,就会再次调用服务端拉取配置
          cacheData.setSyncWithServer(false);
          // listenExecutebell 阻塞队列塞入数据,进行下一个 5s 触发调用,更新配置信息 md5 值,进行比对后客户端侧监听回调处理
          notifyListenConfig();
        }
      }
      return new ConfigChangeNotifyResponse();
    }
    return null;
  });

initRpcClientHandler 方法会接收来自服务端 ConfigChangeNotifyRequest 请求,服务端发布了 LocalDataChangeEvent 事件时会发送此请求,具体的流程在 Nacos 配置中心服务端原理及源码剖析 博文有详细剖析

RpcClient#start 方法及其重要,它在内部会去与 Nacos 服务端建立连接,随即在后面通过 RpcClient 去调用时,服务端才能够拿到当前客户端的元数据信息,确保两边通信是安全态,它内部开启了两个线程去定时监测与服务端之间的连接状态!

// com.alibaba.nacos.common.remote.client.RpcClient#start
// 创建两个核心线程数
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
  Thread t = new Thread(r);
  t.setName("com.alibaba.nacos.client.remote.worker");
  t.setDaemon(true);
  return t;
});
// 该线程用于收到连接成功或失败的状态,及时回调监听器处理连接成功、连接失败对应的方法
clientEventExecutor.submit(() -> {
  while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
    ConnectionEvent take;
    try {
      take = eventLinkedBlockingQueue.take();
      if (take.isConnected()) {
        notifyConnected();
      } else if (take.isDisConnected()) {
        notifyDisConnected();
      }
    } catch (Throwable e) {
      // Do nothing
    }
  }
});
// 该线程用于定时去监测连接是否可靠,若之前服务端下线了,该 RpcClient 5 秒后会去
clientEventExecutor.submit(() -> {
  while (true) {
    try {
      if (isShutdown()) {
        break;
      }
      // 阻塞 5 秒后放行,也就是 5 秒进行一次心跳监测
      ReconnectContext reconnectContext = reconnectionSignal
        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
      if (reconnectContext == null) {
        // 检查存活时长是否超过 5 s
        if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
          boolean isHealthy = healthCheck();
          if (!isHealthy) {
            // 连接不健康并且当前连接是空就不处理
            if (currentConnection == null) {
              continue;
            }
            LoggerUtils.printIfInfoEnabled(LOGGER,
                                           "[{}] Server healthy check fail, currentConnection = {}", name,
                                           currentConnection.getConnectionId());
            // 获取当前 Rpc 状态,若是终止状态就退出
            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
              break;
            }
            boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
              .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
            if (statusFLowSuccess) {
              reconnectContext = new ReconnectContext(null, false);
            } else {
              continue;
            }
          } else {
            // 更新时间
            lastActiveTimeStamp = System.currentTimeMillis();
            continue;
          }
        } else {
          continue;
        }
      }
      if (reconnectContext.serverInfo != null) {
        // clear recommend server if server is not in server list.
        boolean serverExist = false;
        for (String server : getServerListFactory().getServerList()) {
          ServerInfo serverInfo = resolveServerInfo(server);
          if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
            serverExist = true;
            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
            break;
          }
        }
        if (!serverExist) {
          LoggerUtils.printIfInfoEnabled(LOGGER,
                                         "[{}] Recommend server is not in server list, ignore recommend server {}", name,
                                         reconnectContext.serverInfo.getAddress());
          reconnectContext.serverInfo = null;
        }
      }
      // 重新建立新的连接,选择另外一个 UP 服务端节点进行连接,同时会往 eventLinkedBlockingQueue 队列中塞数据进行连接回调通知
      reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
    } catch (Throwable throwable) {
      // Do nothing
    }
  }
});
// 首次启动 RpcClient 会建立与服务端的连接,担保重试三次与服务端建立连接的机会
int startUpRetryTimes = RETRY_TIMES;
while (startUpRetryTimes > 0 && connectToServer == null) {
  try {
    startUpRetryTimes--;
    ServerInfo serverInfo = nextRpcServer();
    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
                                   serverInfo);
    connectToServer = connectToServer(serverInfo);
  } catch (Throwable e) {
    LoggerUtils.printIfWarnEnabled(LOGGER,
                                   "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
                                   name, e.getMessage(), startUpRetryTimes, e);
  }
}

在这里总结一下,调用 RpcClient#start 方法,会创建两个线程

  1. 一个线程来负责监听连接状态,一直阻塞,直到 eventLinkedBlockingQueue 队列中存在元素,无论连接成功还是失败都会通知监听器去处理对应的回调方法,notifyConnected、notifyDisConnected
  2. 一个线程来通过 5s 定时监测的机制来确保客户端这边与服务端之间的通信是正常的,在集群模式下,若当前连接的服务节点不正常了,那么就会挑选另外一台服务节点进行重连,以确保客户端这边的正常操作,每次重连成功会往 eventLinkedBlockingQueue 塞入事件:ConnectionEvent.CONNECTED,若重连失败 塞入事件:ConnectionEvent.DISCONNECTED
  3. RpcClient 在最尾部在初始建立与服务节点的连接,同时往 eventLinkedBlockingQueue 塞入事件:ConnectionEvent.CONNECTED

总结

Nacos 2.x 抛弃了 1.x 采用的长轮询模式,替而代之的是长连接模式,通过心跳的机制定时去监测服务端实例状态是否正常,若不正常了,切换到另外一个服务端实例

客户端感应到了主动去拉取最新的配置数据进行 md5 比对,若与之前的 md5 不一样了,此时将发生变化的状态通知到 Listener 持有者,此时,持有者就会重新从 Nacos 服务端拿到最新的数据保存下来,在这里总结一下 Nacos 客户端关于配置这块内容应用到的轮询方案:

  1. Nacos Config Client 每 5 分钟会进行一次全量比对,有没有配置发生了变化
  2. Nacos Config Client 每 5 秒钟会去扫描一次,当前 CacheMap 集合元素是不是发生了配置变化

关于 Nacos 2.x 在架构上改进方案的更多内容解读,可以看这篇文章:支持 gRPC 长链接,深度解读 Nacos 2.0 架构设计及新模型

博文主要介绍的是 Nacos 客户端这一侧的拉取是如何去工作的,同时提及到了 Nacos 服务端部分内容,关于 Nacos 服务端这一块推送的内容会单独有一篇文章进行详细解析!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!



目录
相关文章
|
2天前
|
SpringCloudAlibaba 应用服务中间件 Nacos
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心(下)
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心
14 0
|
2天前
|
JSON SpringCloudAlibaba Java
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心(上)
【微服务 SpringCloudAlibaba】实用篇 · Nacos配置中心
16 1
|
2天前
|
Nacos
nacos 配置页面的模糊查询
nacos 配置页面的模糊查询
|
2天前
|
机器学习/深度学习 Java Nacos
Nacos 配置中心(2023旧笔记)
Nacos 配置中心(2023旧笔记)
21 0
|
2天前
|
存储 前端开发 Java
第十一章 Spring Cloud Alibaba nacos配置中心
第十一章 Spring Cloud Alibaba nacos配置中心
28 0
|
2天前
|
敏捷开发 API 持续交付
云效产品使用常见问题之把云效上的配置发到Nacos上面去如何解决
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
2天前
|
SpringCloudAlibaba Java Nacos
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
|
2天前
|
关系型数据库 MySQL Nacos
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
61 1
|
2天前
|
Nacos
nacos手动创建配置命名空间隔离
nacos手动创建配置命名空间隔离
25 1
|
2天前
|
编解码 Java Nacos
nacos常见问题之密码加密配置如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
191 0

热门文章

最新文章