Nacos配置中心之客户端长轮询

简介: Nacos配置中心之客户端长轮询

Nacos配置中心之客户端长轮询

客户端长轮询定时任务是在NacosFactory的createConfigService构建ConfigService对象实例的时候启动的

createConfigService

public static ConfigService createConfigService(String serverAddr) throws NacosException {
    return ConfigFactory.createConfigService(serverAddr);
}
public class ConfigFactory {

    /**
     * Create Config
     *
     * @param properties init param
     * @return ConfigService
     * @throws NacosException Exception
     */
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

    /**
     * Create Config
     *
     * @param serverAddr serverList
     * @return Config
     * @throws ConfigService Exception
     */
    public static ConfigService createConfigService(String serverAddr) throws NacosException {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return createConfigService(properties);
    }

}
  1. 通过Class.forName加载NacosConfigService类
  2. 使用反射来完成NacosConfigService类的实例化

NacosConfigService构造

NacosConfigService构造方法:

public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty("encode");
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = "UTF-8";
    } else {
        this.encode = encodeTmp.trim();
    }

    this.initNamespace(properties);
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start();
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
  1. 初始化HttpAgent,使用了装饰器模式,实际工作的类是ServerHttpAgent,MetricsHttpAgent内部也调用了ServerHttpAgent的方法,增加监控统计信息
  2. ClientWorker是客户端的工作类,agent作为参数传入ClientWorker,用agent做一些远程调用

ClientWorker构造

ClientWorker的构造函数:

@SuppressWarnings("PMD.ThreadPoolCreationRule")
    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter

        init(properties);

        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }
  1. 构建定时调度的线程池,第一个线程池executor只拥有一个核心线程,每隔10s执行一次checkConfigInfo()方法,功能就是每10ms检查一次配置信息
  2. 第二个线程池executorService只完成了初始化,后续用于客户端的定时长轮询功能。

checkConfigInfo方法:

public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

检查配置是否发生变化

cacheMap用来存储监听变更的缓存集合,key是根据dataID/group/tenant拼接的值。Value是对应的存储在Nacos服务器上的配置文件的内容。

默认情况下每个长轮询LongPollingRunnable任务处理3000个监听配置集,超过3000个启动多个LongPollingRunnable执行。

LongPollingRunnable

LongPollingRunnable是一个线程,我们可以直接找到LongPollingRunnable里面的run方法

class LongPollingRunnable implements Runnable {
    private int taskId;

    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {

        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.get().values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }

            // check server config
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    String content = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(content);
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                        agent.getName(), dataId, group, tenant, cache.getMd5(),
                        ContentUtils.truncateContent(content));
                } catch (NacosException ioe) {
                    String message = String.format(
                        "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                        agent.getName(), dataId, group, tenant);
                    LOGGER.error(message, ioe);
                }
            }
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();

            executorService.execute(this);

        } catch (Throwable e) {

            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}
  1. 遍历CacheData,检查本地配置,根据taskId对cacheMap进行数据分割,通过checkLocalConfig方法检查本地配置,本地在${user}\naocs\config\目录下缓存一份服务端的配置信息,checkLocalConfig将内存中的数据和本地磁盘数据比较,不一致说明数据发生了变化,需要触发事件通知。
  2. 执行checkUpdateDataIds方法在服务端建立长轮询机制,通过长轮询检查数据变更。
  3. 遍历变更数据集合changedGroupKeys,调用getServerConfig方法,根据dataId,group,tenant去服务端读取对应的配置信息并保存到本地文件中。
  4. 继续定时执行当前线程

checkUpdateDataIds

checkUpdateDataIds基于长连接方式监听服务端配置的变化,最后根据变化数据的key去服务端获取最新数据。

checkUpdateDataIds中调用checkUpdateConfigStr

/**
 * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
 */
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {

    List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);

    List<String> headers = new ArrayList<String>(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + timeout);

    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }

    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }

    try {
        HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
            agent.getEncode(), timeout);

        if (HttpURLConnection.HTTP_OK == result.code) {
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.content);
        } else {
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
        }
    } catch (IOException e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

checkUpdateConfigStr通过agent.httpPost调用/listener接口实现长轮询请求。长轮询请求是实现层面只是设置了一个比较长的超时时间,默认30s。如果服务端的数据发生变更,客户端会收到HttpResult。服务端返回的是存在数据变更的dataId, group, tenant。获得这些信息后,在LongPollingRunnable的run方法中调用getServerConfig方法从Nacos服务器中读取具体的配置内容。

getServerConfig

从Nacos服务器中读取具体的配置内容:

public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
    throws NacosException {
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }

    HttpResult result = null;
    try {
        List<String> params = null;
        if (StringUtils.isBlank(tenant)) {
            params = Arrays.asList("dataId", dataId, "group", group);
        } else {
            params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
        }
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (IOException e) {
        String message = String.format(
            "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
            dataId, group, tenant);
        LOGGER.error(message, e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }

    switch (result.code) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
            return result.content;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return null;
        case HttpURLConnection.HTTP_CONFLICT: {
            LOGGER.error(
                "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                    + "tenant={}", agent.getName(), dataId, group, tenant);
            throw new NacosException(NacosException.CONFLICT,
                "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
                group, tenant);
            throw new NacosException(result.code, result.content);
        }
        default: {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
                group, tenant, result.code);
            throw new NacosException(result.code,
                "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
    }
}
相关文章
|
2月前
|
存储 网络协议 Nacos
高效搭建Nacos:实现微服务的服务注册与配置中心
Nacos(Dynamic Naming and Configuration Service)是阿里巴巴开源的一款动态服务发现、配置管理和服务管理平台。它旨在帮助开发者更轻松地构建、部署和管理分布式系统,特别是在微服务架构中。
444 81
高效搭建Nacos:实现微服务的服务注册与配置中心
|
2月前
|
JSON Java Nacos
SpringCloud 应用 Nacos 配置中心注解
在 Spring Cloud 应用中可以非常低成本地集成 Nacos 实现配置动态刷新,在应用程序代码中通过 Spring 官方的注解 @Value 和 @ConfigurationProperties,引用 Spring enviroment 上下文中的属性值,这种用法的最大优点是无代码层面侵入性,但也存在诸多限制,为了解决问题,提升应用接入 Nacos 配置中心的易用性,Spring Cloud Alibaba 发布一套全新的 Nacos 配置中心的注解。
300 16
|
3月前
|
监控 Java 测试技术
Nacos 配置中心变更利器:自定义标签灰度
本文是对 MSE Nacos 应用自定义标签灰度的功能介绍,欢迎大家升级版本进行试用。
474 18
|
3月前
|
负载均衡 应用服务中间件 Nacos
Nacos配置中心
Nacos配置中心
198 1
Nacos配置中心
|
3月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评。然而,“客户端不发送心跳检测”是使用中常见的问题之一。本文详细探讨了该问题的原因及解决方法,包括检查客户端配置、网络连接、日志、版本兼容性、心跳检测策略、服务实例注册状态、重启应用及环境变量等步骤,旨在帮助开发者快速定位并解决问题,确保服务正常运行。
69 5
|
3月前
|
网络安全 Nacos 开发者
Nacos作为流行的微服务注册与配置中心,“节点提示暂时不可用”是常见的问题之一
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。然而,“节点提示暂时不可用”是常见的问题之一。本文将探讨该问题的原因及解决方案,帮助开发者快速定位并解决问题,确保服务的正常运行。通过检查服务实例状态、网络连接、Nacos配置、调整健康检查策略等步骤,可以有效解决这一问题。
54 4
|
3月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。然而,实际使用中常遇到“客户端不发送心跳检测”的问题。本文深入探讨该问题的原因及解决方案,帮助开发者快速定位并解决问题,确保服务正常运行。通过检查客户端配置、网络连接、日志、版本兼容性、心跳策略、注册状态、重启应用和环境变量等步骤,系统地排查和解决这一问题。
73 3
|
3月前
|
安全 Nacos 数据库
Nacos是一款流行的微服务注册与配置中心,但直接暴露在公网中可能导致非法访问和数据库篡改
Nacos是一款流行的微服务注册与配置中心,但直接暴露在公网中可能导致非法访问和数据库篡改。本文详细探讨了这一问题的原因及解决方案,包括限制公网访问、使用HTTPS、强化数据库安全、启用访问控制、监控和审计等步骤,帮助开发者确保服务的安全运行。
147 3
|
5月前
|
负载均衡 Java Nacos
SpringCloud基础2——Nacos配置、Feign、Gateway
nacos配置管理、Feign远程调用、Gateway服务网关
SpringCloud基础2——Nacos配置、Feign、Gateway
|
3月前
|
SQL 关系型数据库 数据库连接
"Nacos 2.1.0版本数据库配置写入难题破解攻略:一步步教你排查连接、权限和配置问题,重启服务轻松解决!"
【10月更文挑战第23天】在使用Nacos 2.1.0版本时,可能会遇到无法将配置信息写入数据库的问题。本文将引导你逐步解决这一问题,包括检查数据库连接、用户权限、Nacos配置文件,并提供示例代码和详细步骤。通过这些方法,你可以有效解决配置写入失败的问题。
176 0