Nacos 配置中心源码分析

简介: Nacos 配置中心源码分析

本文主要和大家一起以源码的角度来分析 Nacos 配置中心的配置信息获取,以及配置信息动态同步的过程和原理。环境介绍和使用 环境介绍:


  • Jdk 1.8
  • nacos-server-1.4.2
  • spring-boot-2.3.5.RELEASE
  • spring-cloud-Hoxton.SR8
  • spring-cloiud-alibab-2.2.5.RELEASE


如果我们需要使用 Nacos 作为配置中心,我们首先需要导入 Nacos Config 的依赖信息,如下所示:


<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>


然后再 bootstartp.yml 文件中配置 Nacos 服务信息。


spring:
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848


客户端初始化


主要是通过 NacosConfigBootstrapConfiguration 类来进行初始化

NacosConfigManager 、NacosPropertySourceLocator


@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {
 @Bean
 @ConditionalOnMissingBean
 public NacosConfigManager nacosConfigManager(
   NacosConfigProperties nacosConfigProperties) {
  return new NacosConfigManager(nacosConfigProperties);
 }
    @Bean
 public NacosPropertySourceLocator nacosPropertySourceLocator(
   NacosConfigManager nacosConfigManager) {
  return new NacosPropertySourceLocator(nacosConfigManager);
 }
    // ...
}


在 NacosConfigManager 的构造方法中会调用 createConfigService 方法来创建 ConfigService 实例,内部调用工厂方法 ConfigFactory#createConfigService 通过反射实例化一个com.alibaba.nacos.client.config.NacosConfigService 的实例对象。


public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}


NacosPropertySourceLocator 继承 PropertySourceLocator(PropertySourceLocator接口支持扩展自定义配置加载到 Spring Environment中)通过 locate 加载配置信息。


@Override
 public PropertySource<?> locate(Environment env) {
  nacosConfigProperties.setEnvironment(env);
  ConfigService configService = nacosConfigManager.getConfigService();
  if (null == configService) {
   log.warn("no instance of config service found, can't load config from nacos");
   return null;
  }
  long timeout = nacosConfigProperties.getTimeout();
  nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
    timeout);
  String name = nacosConfigProperties.getName();
  String dataIdPrefix = nacosConfigProperties.getPrefix();
  if (StringUtils.isEmpty(dataIdPrefix)) {
   dataIdPrefix = name;
  }
  if (StringUtils.isEmpty(dataIdPrefix)) {
   dataIdPrefix = env.getProperty("spring.application.name");
  }
  CompositePropertySource composite = new CompositePropertySource(
    NACOS_PROPERTY_SOURCE_NAME);
        // 共享配置
  loadSharedConfiguration(composite);
  // 拓展配置
        loadExtConfiguration(composite);
  // 应用配置
        loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
  return composite;
 }


配置读取过程


配置加载有三个方法 loadSharedConfiguration、loadSharedConfiguration、 loadApplicationConfiguration 以 loadApplicationConfiguration 继续跟进。


private void loadApplicationConfiguration(
    CompositePropertySource compositePropertySource, String dataIdPrefix,
    NacosConfigProperties properties, Environment environment) {
    String fileExtension = properties.getFileExtension();
    String nacosGroup = properties.getGroup();
    // load directly once by default
    loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
                           fileExtension, true);
    // load with suffix, which have a higher priority than the default
    loadNacosDataIfPresent(compositePropertySource,
                           dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
    // Loaded with profile, which have a higher priority than the suffix
    for (String profile : environment.getActiveProfiles()) {
        String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
        loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
                               fileExtension, true);
    }
}


主要通过 loadNacosDataIfPresent 读取配置信息, 其实我们可以通过参数看出,主要配置文件包含以下部分:dataId, group, fileExtension


private void loadNacosDataIfPresent(final CompositePropertySource composite,
                                    final String dataId, final String group, String fileExtension,
                                    boolean isRefreshable) {
    if (null == dataId || dataId.trim().length() < 1) {
        return;
    }
    if (null == group || group.trim().length() < 1) {
        return;
    }
    NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
                                                                      fileExtension, isRefreshable);
    this.addFirstPropertySource(composite, propertySource, false);
}


然后调用 loadNacosPropertySource 最后一步步的会调用到 NacosConfigService#getConfigInner


private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
        group = null2defaultGroup(group);
        ParamUtils.checkKeyParam(dataId, group);
        ConfigResponse cr = new ConfigResponse();
        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);
        // 优先使用本地配置
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        if (content != null) {
            LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
                    dataId, group, tenant, ContentUtils.truncateContent(content));
            cr.setContent(content);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        }
        try {
            // 获取远程配置
            String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
            cr.setContent(ct[0]);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        } catch (NacosException ioe) {
            if (NacosException.NO_RIGHT == ioe.getErrCode()) {
                throw ioe;
            }
            LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                    agent.getName(), dataId, group, tenant, ioe.toString());
        }
        LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
                dataId, group, tenant, ContentUtils.truncateContent(content));
        content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }


加载远程配置


worker.getServerConfig 主要是获取远程配置, ClIentWorker 的 getServerConfig 定义如下:


public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
    throws NacosException {
    String[] ct = new String[2];
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }
    HttpRestResult<String> result = null;
    try {
        Map<String, String> params = new HashMap<String, String>(3);
        if (StringUtils.isBlank(tenant)) {
            params.put("dataId", dataId);
            params.put("group", group);
        } else {
            params.put("dataId", dataId);
            params.put("group", group);
            params.put("tenant", tenant);
        }
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (Exception ex) {
        String message = String
            .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ex);
        throw new NacosException(NacosException.SERVER_ERROR, ex);
    }
    switch (result.getCode()) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
            ct[0] = result.getData();
            if (result.getHeader().getValue(CONFIG_TYPE) != null) {
                ct[1] = result.getHeader().getValue(CONFIG_TYPE);
            } else {
                ct[1] = ConfigType.TEXT.getType();
            }
            return ct;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return ct;
        case HttpURLConnection.HTTP_CONFLICT: {
            LOGGER.error(
                "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                + "tenant={}", agent.getName(), dataId, group, tenant);
            throw new NacosException(NacosException.CONFLICT,
                                     "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(),
                         dataId, group, tenant);
            throw new NacosException(result.getCode(), result.getMessage());
        }
        default: {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(),
                         dataId, group, tenant, result.getCode());
            throw new NacosException(result.getCode(),
                                     "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="
                                     + tenant);
        }
    }
}


agent 默认使用 MetricsHttpAgent 实现类


配置同步过程


Nacos 配置同步过程如下图所示:


640.png

客户端请求


客户端初始请求配置完成后,会通过 WorkClient 进行长轮询查询配置, 它的构造方法如下:


public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        // Initialize the timeout parameter
        init(properties);
        // 检查线程池
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        // 长轮询线程
        this.executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }


这里初始化了两个线程池:


  • 第一个线程池主要是用来初始化做长轮询的;
  • 第二个线程池使用来做检查的,会每间隔 10 秒钟执行一次检查方法 checkConfigInfo


checkConfigInfo


在这个方法里面主要是分配任务,给每个 task 分配一个 taskId , 后面会去检查本地配置和远程配置,最终调用的是 LongPollingRunable 的 run 方法。


public void checkConfigInfo() {
    // Dispatch taskes.
    int listenerSize = cacheMap.size();
    // Round up the longingTaskCount.
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}


LongPollingRunnable


长轮询线程实现,首先第一步检查本地配置信息,然后通过 dataId 去检查服务端是否有变动的配置信息,如果有就更新下来然后刷新配置。


public void run() {
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            // 触发回调
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }
            // check server config
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
            }
            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                                agent.getName(), dataId, group, tenant, cache.getMd5(),
                                ContentUtils.truncateContent(ct[0]), ct[1]);
                } catch (NacosException ioe) {
                    String message = String
                        .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                agent.getName(), dataId, group, tenant);
                    LOGGER.error(message, ioe);
                }
            }
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            executorService.execute(this);
        } catch (Throwable e) {
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}


addTenantListeners


添加监听,这里主要是通过 dataId , group 来获取 cache 本地缓存的配置信息,然后再将 Listener 也传给 cache 统一管理。


public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
        throws NacosException {
    group = null2defaultGroup(group);
    String tenant = agent.getTenant();
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    for (Listener listener : listeners) {
        cache.addListener(listener);
    }
}


回调触发


如果 md5 值发生变化过后就会调用 safeNotifyListener 方法然后将配置信息发送给对应的监听器


void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}


服务端响应


当服务端收到请求后,会 hold 住当前请求,如果有变化就返回,如果没有变化就等待超时之前返回无变化。


/**
 * The client listens for configuration changes.
 */
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    Map<String, String> clientMd5Map;
    try {
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    // do long-polling
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}


LongPollingService


核心处理类 LongPollingService


/**
     * Add LongPollingClient.
     *
     * @param req              HttpServletRequest.
     * @param rsp              HttpServletResponse.
     * @param clientMd5Map     clientMd5Map.
     * @param probeRequestSize probeRequestSize.
     */
    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
            int probeRequestSize) {
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // Do nothing but set fix polling timeout.
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // Must be called by http thread, or send response.
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout() is incorrect, Control by oneself
        asyncContext.setTimeout(0L);
        ConfigExecutor.executeLongPolling(
                new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }
相关文章
|
16小时前
|
Nacos
nacos 配置页面的模糊查询
nacos 配置页面的模糊查询
|
16小时前
|
机器学习/深度学习 Java Nacos
Nacos 配置中心(2023旧笔记)
Nacos 配置中心(2023旧笔记)
21 0
|
16小时前
|
存储 前端开发 Java
第十一章 Spring Cloud Alibaba nacos配置中心
第十一章 Spring Cloud Alibaba nacos配置中心
26 0
|
16小时前
|
敏捷开发 API 持续交付
云效产品使用常见问题之把云效上的配置发到Nacos上面去如何解决
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
16小时前
|
SpringCloudAlibaba Java Nacos
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
|
16小时前
|
Nacos
nacos手动创建配置命名空间隔离
nacos手动创建配置命名空间隔离
25 1
|
16小时前
|
编解码 Java Nacos
nacos常见问题之密码加密配置如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
179 0
|
16小时前
|
安全 前端开发 Nacos
nacos常见问题之配置注册的白名单如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
133 0
|
16小时前
|
网络安全 Nacos 数据安全/隐私保护
nacos常见问题之配置内容不显示也修改不了如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
161 0
|
16小时前
|
运维 Java Nacos
nacos常见问题之配置不生效不加载shared-configs 配置如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
194 0

热门文章

最新文章