【Nacos源码之配置管理 七】服务端增删改配置数据之后如何通知集群中的其他机器

简介: 在后台管理界面可以直接增删改查所有的配置数据,那么问题来了1. [x] 增删改数据这个操作,除了落库,还做了哪些操作?1新增配置数据打开后台新建一个配置

在后台管理界面可以直接增删改查所有的配置数据,那么问题来了

  1. [x] 增删改数据这个操作,除了落库,还做了哪些操作?

1新增配置数据


打开后台新建一个配置

  1. 如果是新增先访问Http请求ConfigController.getConfig()检验dataId,group等等是否已经存在,已经存在提示不让新增
  2. 不存在可以新增,则访问 ConfigController.publishConfig方法发布配置数据;

ConfigController.publishConfig 发布数据

/**
     * 增加或更新非聚合数据。
     *
     * @throws NacosException
     */
    @RequestMapping(method = RequestMethod.POST)
    @ResponseBody
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response)
        throws NacosException {
        //部分省略....
        if (AggrWhitelist.isAggrDataId(dataId)) {
            log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",
                RequestUtil.getRemoteIp(request), dataId, group);
            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
        }
        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else { // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
            LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }
  1. persistService.insertOrUpdate将配置信息持久化到数据库
  2. 发起配置数据有变化事件的通知
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));

关于事件通知不懂的可以查看这篇文章  原 【Nacos源码之配置管理 二】Nacos中的事件发布与订阅--观察者模式

ConfigDataChangeEvent这个事件只有AsyncNotifyService 监听了那好,我们主要看AsyncNotifyService做了哪些事情;

AsyncNotifyService 异步通知服务

@Override
    public void onEvent(Event event) {
        // 并发产生 ConfigDataChangeEvent
        if (event instanceof ConfigDataChangeEvent) {
            ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
            long dumpTs = evt.lastModifiedTs;
            String dataId = evt.dataId;
            String group = evt.group;
            String tenant = evt.tenant;
            String tag = evt.tag;
            List<?> ipList = serverListService.getServerList();
            // 其实这里任何类型队列都可以
            Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
            for (int i = 0; i < ipList.size(); i++) {
                queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
            }
            EXECUTOR.execute(new AsyncTask(httpclient, queue));
        }
    }
  1. 通过serverListService.getServerList(); 拿到所有的集群服务器列表.这个列表是配置的所有服务器列表,也包括不健康的服务器; PS:关于集群服务器列表的获取可以看文章 【Nacos源码之配置管理 六】集群模式下服务器之间是如何感知的
  2. 遍历服务器列表ipList,每个服务器(包括自己)组装成一个任务NotifySingleTask,对象里面有一个属性url; 组装url的代码,这个Url就是一会要请求的链接


private static final String URL_PATTERN = "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH
            + "/dataChange"
            + "?dataId={2}&group={3}";
this.url = MessageFormat.format(URL_PATTERN, target, RunningConfigUtils.getContextPath(), dataId,
                    group);

最终组装成的url形式 http://ip:port/nacos/communication/dataChange?dataId={2}&group={3}&tenant={4}&tag=tag3. 2中组装成的所有NotifySingleTask放到Queue队列中, 然后传到AsyncTask中; AsyncTask是一个http异步的任务; 它会http请求NotifySingleTask对象中的url

AsyncTask 异步任务

class AsyncTask implements Runnable {
        public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
            this.httpclient = httpclient;
            this.queue = queue;
        }
        @Override
        public void run() {
            executeAsyncInvoke();
        }
        private void executeAsyncInvoke() { while (!queue.isEmpty()) {
                NotifySingleTask task = queue.poll();
                String targetIp = task.getTargetIP();
                if (serverListService.getServerList().contains(
                    targetIp)) {
                    // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知
                    if (serverListService.isHealthCheck()
                        && ServerListService.getServerListUnhealth().contains(targetIp)) {
                        // target ip 不健康,则放入通知列表中
                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                            task.getLastModified(),
                            LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
                        // get delay time and set fail count to the task
                        asyncTaskExecute(task);
                    } else {
                        HttpGet request = new HttpGet(task.url);
                        request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
                            String.valueOf(task.getLastModified()));
                        request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
                        if (task.isBeta) {
                            request.setHeader("isBeta", "true");
                        }
                        httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
                    }
                }
            }
        }
        private Queue<NotifySingleTask> queue;
        private CloseableHttpAsyncClient httpclient;
    }

这个任务的执行流程如下:

  1. 如果targetIp不在serverListService.getServerList() 列表中就忽略( 出现这个原因的情况可可能是远程配置文件移除了这台服务器)
  2. 如果远程配置文件服务器是健康的但是targetIp是不健康的,就发起重试流程asyncTaskExecute(task); 这个重试任务会延迟执行
  3. 如果targetIp健康,则发起请求,请求链接就是之前组装的url ;
http://ip:port/nacos/communication/dataChange?dataId={2}&group={3}&tenant={4}&tag=tag
  1. 如果这个url请求失败,也会发起重试流程 asyncTaskExecute(task);

重试延迟执行时间

走入重试流程的话,会延迟一定的时间来执行;那么延迟多久呢?

 

/**
     * get delayTime and also set failCount to task;失败时间指数增加,以免断网场景不断重试无效任务,影响正常同步
     *
     * @param task notify task
     * @return delay
     */
    private static int getDelayTime(NotifySingleTask task) {
        int failCount = task.getFailCount();
        int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
        if (failCount <= MAX_COUNT) {
            task.setFailCount(failCount + 1);
        }
        return delay;
    }

失败时间指数增加,以免断网场景不断重试无效任务,影响正常同步

通知配置信息改变

上面第3步骤说发起一个http请求url; 这个请求就是通知所有服务器有数据变更了(包括通知自己),这个通知的方法在CommunicationController.notifyConfigInfo最终执行的方法是DumpServicedump方法



dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);

关于DumpService可以看之前的文章 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中DumpService的dump方法其实也是new一个DumpTask任务放到任务执行类dumpTaskMgr里面等待执行,这个执行类是单线程操作;

 

public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
    }

这个dumpTaskMgr的执行器是哪个呢?



dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager",new DumpProcessor(this));

最终的执行器是 DumpProcessor 那么最终执行的是DumpProcessor的process方法;

DumpProcessor ,Dump指定的配置信息

【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中里面有详细讲解执行器DumpAllProcessor 这个执行器是一次把所有的配置给Dump下来;而DumpProcessor是只dump指定的配置!

因为第四篇文章已经详细讲解了Dump流程,这里只简要说明一下

  1. 先数据库查询这个配置信息是否存在(因为有可能在其他机器上执行了删除操作),存在的话,将指定的配置文件Dump到服务器磁盘文件上; 如果没有查到就把本地的也删除并通知
  2. 更新服务器上配置数据的缓存
  3. 发送LocalDataChangeEvent事件,通知本地数据有变更;最终执行的是DataChangeTask任务;

DataChangeTask 数据变更任务

  1. 遍历所有的长轮询订阅者者(怎么订阅上的?后面会有文章介绍)
  2. 如果是beta发布且不在beta列表直接跳过
  3. 如果tag发布且不在tag列表直接跳过
  4. 发送Http响应(既然是响应,那什么时候请求的呢?后面介绍)通知所有未被上面2、3过滤掉的的订阅者最新的配置数据ConfigInfo

2总结


  1. 增删改数据库数据
  2. 请求所有的集群服务器列表的http请求/communication/dataChange
  3. 每个服务器收到请求之后就更新本地磁盘文件
  4. 每个服务器更新本地缓存
  5. 每个服务器轮询订阅自己的客户端
  6. 如果是beta灰度发布,但是这个客户端不在灰度Ip列表里面则忽略
  7. 如果是tag发布切不在tag列表直接忽略
  8. 发送Http响应通知所有未被上面6、7过滤掉的的订阅者最新的配置数据ConfigInfo,这个数据只是dataId,group等等;没有返回content的内容
  9. 客户端收到响应之后可以调用服务端http请求 ConfigServletInner.doGetConfig方法来获取最新数据;
  10. ConfigServletInner.doGetConfig获取数据是获取服务器本地磁盘中的数据,是没有走数据库的;因为本地磁盘的文件就是最新的数据

3问题


读完这篇文章我们知道了, 增删改配置之后做了哪些事情,相信看完这篇文章之后会有新的问题;

  • [ ] 客户端是如何订阅服务端的?
  • [ ]  服务端又是怎么通知到客户端数据变更的?
  • [ ] 客户端与服务端直接是长连接还是短连接?
  • [ ] 客户端与服务端是推还是拉?

以上问题,我会再后面的文章中一一分析; 欢迎关注我的公众号 进击的老码农(jjdlmn)Nacos源码系列会再公众号中首发;每日会推一些科技资讯、Java、面试题 、源码系列;

相关文章
|
6月前
|
存储 Kubernetes 安全
Nacos-Controller 2.0:使用 Nacos 高效管理你的 K8s 配置
无论是使用 Nacos-Controller 实现配置的双向同步,还是直接在应用中接入 Nacos SDK 以获得更高级的配置管理特性,都能显著提升配置管理的灵活性、安全性和可维护性。使用 Nacos,您能够更好地管理和优化您的应用配置,从而提高系统的稳定性和可靠性。
518 49
|
10月前
|
存储 网络协议 Nacos
高效搭建Nacos:实现微服务的服务注册与配置中心
Nacos(Dynamic Naming and Configuration Service)是阿里巴巴开源的一款动态服务发现、配置管理和服务管理平台。它旨在帮助开发者更轻松地构建、部署和管理分布式系统,特别是在微服务架构中。
1710 82
高效搭建Nacos:实现微服务的服务注册与配置中心
|
10月前
|
JSON Java Nacos
SpringCloud 应用 Nacos 配置中心注解
在 Spring Cloud 应用中可以非常低成本地集成 Nacos 实现配置动态刷新,在应用程序代码中通过 Spring 官方的注解 @Value 和 @ConfigurationProperties,引用 Spring enviroment 上下文中的属性值,这种用法的最大优点是无代码层面侵入性,但也存在诸多限制,为了解决问题,提升应用接入 Nacos 配置中心的易用性,Spring Cloud Alibaba 发布一套全新的 Nacos 配置中心的注解。
927 151
|
6月前
|
存储 人工智能 测试技术
Nacos托管LangChain应用Prompts和配置,助力你的AI助手快速进化
AI 应用开发中,总有一些让人头疼的问题:敏感信息(比如 API-KEY)怎么安全存储?模型参数需要频繁调整怎么办?Prompt 模板改来改去,每次都得重启服务,太麻烦了!别急,今天我们就来聊聊如何用 Nacos 解决这些问题。
|
8月前
|
Cloud Native Java Nacos
springcloud/springboot集成NACOS 做注册和配置中心以及nacos源码分析
通过本文,我们详细介绍了如何在 Spring Cloud 和 Spring Boot 中集成 Nacos 进行服务注册和配置管理,并对 Nacos 的源码进行了初步分析。Nacos 作为一个强大的服务注册和配置管理平台,为微服务架构提供
3129 14
|
11月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评。然而,“客户端不发送心跳检测”是使用中常见的问题之一。本文详细探讨了该问题的原因及解决方法,包括检查客户端配置、网络连接、日志、版本兼容性、心跳检测策略、服务实例注册状态、重启应用及环境变量等步骤,旨在帮助开发者快速定位并解决问题,确保服务正常运行。
186 5
|
11月前
|
Dubbo Cloud Native 应用服务中间件
阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。
在云原生时代,微服务架构成为主流。阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。示例代码展示了如何在项目中实现两者的整合,通过 Nacos 动态调整服务状态和配置,适应多变的业务需求。
317 2
|
11月前
|
监控 Java 测试技术
Nacos 配置中心变更利器:自定义标签灰度
本文是对 MSE Nacos 应用自定义标签灰度的功能介绍,欢迎大家升级版本进行试用。
1032 216
|
11月前
|
负载均衡 应用服务中间件 Nacos
Nacos配置中心
Nacos配置中心
580 1
Nacos配置中心
|
11月前
|
网络安全 Nacos 开发者
Nacos作为流行的微服务注册与配置中心,“节点提示暂时不可用”是常见的问题之一
Nacos作为流行的微服务注册与配置中心,其稳定性和易用性备受青睐。然而,“节点提示暂时不可用”是常见的问题之一。本文将探讨该问题的原因及解决方案,帮助开发者快速定位并解决问题,确保服务的正常运行。通过检查服务实例状态、网络连接、Nacos配置、调整健康检查策略等步骤,可以有效解决这一问题。
240 4

热门文章

最新文章