前言
在之前文章一文带你从零到一深入透析 @RefreshScope 结合 Nacos 动态刷新源码,提及到了
- NacosPropertySourceLocator#locate 加载客户端应用配置信息时,会调用 NacosConfigService#getConfig 方法,去请求服务端从本地或数据库中获取配置
- NacosContextRefresher#onApplicationEvent 应用准备就绪后,会去调用 NacosConfigService#addListener 方法,它用于注册监听器,后续发生动态配置信息变更后,它会回调:1、进行环境对象属性信息重新加载,启动 bootstrap 父容器,再调用 getConfig 方法获取最新的配置信息;2、销毁 @RefreshScope 标注的 Bean 对象
在注册中心,Nacos 有核心类 NacosNamingService,而在配置中心,Nacos 有核心类 NacosConfigService
在这里,会分析以上两个方法在客户端是如何去处理交互的,以及在客户端比较核心的一个长轮询机制
NacosConfigService 核心类加载过程
在 Nacos 1.0 中,配置基于 Http 代理模拟长连接方式实现去拉取配置,而在 2.0 版本以后进行了架构改进,调整为了基于 Grpc 协议实现真正的长连接
在 NacosConfigService 构造方法中,实例化了一个 ClientWorker 客户端搬运工核心类,由它来负责长连接,保持与 Nacos 服务端配置的及时同步,具体源码如下:
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager, final Properties properties) throws NacosException { this.configFilterChainManager = configFilterChainManager; init(properties); // Grpc 客户端配置调用类 agent = new ConfigRpcTransportClient(properties, serverListManager); int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE); // 创建一个定时任务线程池 ScheduledExecutorService executorService = Executors .newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker"); t.setDaemon(true); return t; }); agent.setExecutor(executorService); agent.start(); }
它内部通过 ConfigRpcTransportClient 类(它连接类型为 Grpc)作为 agent 代理进行工作,先创建好线程数最少为 2 个定时调度线程池,再启动搬运工
public void start() throws NacosException { // 客户端请求鉴权 securityProxy.login(this.properties); // 五分钟进行一次鉴权 this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS); // ConfigRpcTransportClient#startInternal startInternal(); }
先维护好客户端与服务端之间的鉴权工作,然后每间隔五秒钟进行一次数据的校验比对、同步获取最新配置信息
public void startInternal() { executor.schedule(() -> { while (!executor.isShutdown() && !executor.isTerminated()) { try { // 五秒钟定时进行一次拉取 listenExecutebell.poll(5L, TimeUnit.SECONDS); if (executor.isShutdown() || executor.isTerminated()) { continue; } executeConfigListen(); } catch (Exception e) { LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); } } }, 0L, TimeUnit.MILLISECONDS); }
从这里就是 NacosConfigService 类初始化做的准备工作,后续的工作会往 listenExecutebell 阻塞队列中塞入元素,executeConfigListen 配置监听方法才会继续往下处理
NacosConfigService#getConfig 方法
从 Nacos 获取配置信息优先级如下
- 从本地故障转移 faillover 文件中获取内容,存在直接通过该方法获取配置信息,该文件不由客户端程序去进行维护,而是由我们开发人员手动为它去添加,避免出现
客户端紧急重启且同一时间更改了配置、Nacos 服务端下线
情况出现 - 向 Nacos 服务端发出 ConfigQueryRequest 请求拉取配置信息,服务端响应 ConfigResponse,获取 context 内容返回
- 若服务端返回的不是鉴权错误时,就从本地快照文件中获取配置内容,在客户端同步到配置以后,会往本地快照文件中存储一份数据,当配置发生新的改变时,会对该快照文件进行重写
getConfig 方法的内部由 getConfigInner 方法实现,源码如下:
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { group = blank2defaultGroup(group); ParamUtils.checkKeyParam(dataId, group); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setTenant(tenant); cr.setGroup(group); // 先从本地故障转移文件获取内容,由客户端程序维护,而是由用户自身去维护的 // 这是专为特定场景,例如:客户端-紧急重启此时在同一时间改变了配置 或 Nacos 服务器下线时 String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant); if (content != null) { LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content)); cr.setContent(content); String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant); cr.setEncryptedDataKey(encryptedDataKey); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } try { // ConfigRpcTransportClient#queryConfig ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false); cr.setContent(response.getContent()); cr.setEncryptedDataKey(response.getEncryptedDataKey()); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } catch (NacosException ioe) { if (NacosException.NO_RIGHT == ioe.getErrCode()) { throw ioe; } LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", worker.getAgentName(), dataId, group, tenant, ioe.toString()); } // 从本地快照中获取内容 content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant); if (content != null) { LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content)); } cr.setContent(content); // 获取本地加密后的缓存文件,空代表本地不存在文件或抛出异常 String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant); cr.setEncryptedDataKey(encryptedDataKey); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; }
在这里主要分析 ConfigRpcTransportClient#queryConfig 方法是如何去往客户端进行拉取的
- 获取到处理该请求任务的 Grpc 客户端调用实例
- 通过 Grpc 实例向服务端发起 ConfigQueryRequest 请求,在服务端侧由 ConfigQueryRequestHandler 类从持久化层或本地磁盘文件中获取后进行处理,返回读取到的配置信息
- 基于服务端返回的配置信息先存储到本地快照文件中以后再进行返回
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify) throws NacosException { ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant); request.putHeader(NOTIFY_HEADER, String.valueOf(notify)); // 创建一个基于 Grpc 调用的客户端 RpcClient rpcClient = getOneRunningClient(); if (notify) { CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); if (cacheData != null) { rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId())); } } // 基于创建好的客户端连接向服务端发起请求 ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts); ConfigResponse configResponse = new ConfigResponse(); if (response.isSuccess()) { // 将服务端响应成功的配置内容存入本地快照中 LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent()); configResponse.setContent(response.getContent()); String configType; if (StringUtils.isNotBlank(response.getContentType())) { configType = response.getContentType(); } else { configType = ConfigType.TEXT.getType(); } configResponse.setConfigType(configType); String encryptedDataKey = response.getEncryptedDataKey(); LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey); configResponse.setEncryptedDataKey(encryptedDataKey); return configResponse; ..... }
NacosConfigService#addListener 方法
该方法会涉及到往 CacheMap 缓存集合新增 CacheData 元素,在 NacosConfigService 核心类加载过程中提到了 executeConfigListen 方法,其实它就会遍历 CacheMap 中元素,挨个进行判别配置是否发生变更,以便来向 CacheData 里的监听器进行回调,让客户端重新拉取最新的配置信息!
由客户端搬运工来执行 ClientWorker#addTenantListeners 方法,源码如下:
// NacosConfigService#addListener -> ClientWorker#addTenantListeners public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException { group = blank2defaultGroup(group); String tenant = agent.getTenant(); // 若当前 dataId-group-tenant 不存在时,往 CacheMap 缓存中添加元素 CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); synchronized (cache) { // 塞入监听器组、队列元素 for (Listener listener : listeners) { cache.addListener(listener); } cache.setSyncWithServer(false); // 往 listenExecutebell 阻塞队列中塞入元素 agent.notifyListenConfig(); } }
- 往 CacheMap 集合中新增 CacheData 元素
- 往 CacheData 实例中添加当前客户端传入的监听器
- 往 ConfigRpcTransportClient#listenExecutebell 阻塞队列中塞入元素,此时搬运工开辟的线程就会从队列中取到元素,执行 executeConfigListen 方法
接下来继续分析一下 addCacheDataIfAbsent 方法是如何处理的
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException { CacheData cache = getCache(dataId, group, tenant); if (null != cache) { return cache; } String key = GroupKey.getKeyTenant(dataId, group, tenant); synchronized (cacheMap) { CacheData cacheFromMap = getCache(dataId, group, tenant); // 避免在相同 data-id、group 存在竞争,进行双重检查 if (null != cacheFromMap) { cache = cacheFromMap; // reset so that server not hang this check cache.setInitializing(true); } else { cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant); // ParamUtil#getPerTaskConfigSize = 3000 int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize(); cache.setTaskId(taskId); // fix issue # https://github.com/alibaba/nacos/issues/1317 // 为了解决旧版本,重复加载配置信息的过程 if (enableRemoteSyncConfig) { ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false); cache.setEncryptedDataKey(response.getEncryptedDataKey()); cache.setContent(response.getContent()); } } // 更新 CacheMap 值 Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get()); copy.put(key, cache); cacheMap.set(copy); } LOGGER.info("[{}] [subscribe] {}", agent.getName(), key); MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size()); return cache; }
若 CacheMap 集合存在此【dataId、group、tenant 作为唯一标识】元素,直接返回,否则创建新的 CacheData 实例,存入到 CacheMap 中