在后台管理界面可以直接增删改查所有的配置数据,那么问题来了
- [x] 增删改数据这个操作,除了落库,还做了哪些操作?
1新增配置数据
打开后台新建一个配置
- 如果是新增先访问Http请求ConfigController.getConfig()检验dataId,group等等是否已经存在,已经存在提示不让新增
- 不存在可以新增,则访问 ConfigController.publishConfig方法发布配置数据;
ConfigController.publishConfig 发布数据
/** * 增加或更新非聚合数据。 * * @throws NacosException */ @RequestMapping(method = RequestMethod.POST) @ResponseBody public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response) throws NacosException { //部分省略.... if (AggrWhitelist.isAggrDataId(dataId)) { log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request), dataId, group); throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr"); } final Timestamp time = TimeUtils.getCurrentTime(); String betaIps = request.getHeader("betaIps"); ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }
- persistService.insertOrUpdate将配置信息持久化到数据库
- 发起配置数据有变化事件的通知
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
关于事件通知不懂的可以查看这篇文章 原 【Nacos源码之配置管理 二】Nacos中的事件发布与订阅--观察者模式
ConfigDataChangeEvent这个事件只有AsyncNotifyService 监听了那好,我们主要看AsyncNotifyService做了哪些事情;
AsyncNotifyService 异步通知服务
@Override public void onEvent(Event event) { // 并发产生 ConfigDataChangeEvent if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; List<?> ipList = serverListService.getServerList(); // 其实这里任何类型队列都可以 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (int i = 0; i < ipList.size(); i++) { queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta)); } EXECUTOR.execute(new AsyncTask(httpclient, queue)); } }
- 通过serverListService.getServerList(); 拿到所有的集群服务器列表.这个列表是配置的所有服务器列表,也包括不健康的服务器; PS:关于集群服务器列表的获取可以看文章 【Nacos源码之配置管理 六】集群模式下服务器之间是如何感知的
- 遍历服务器列表ipList,每个服务器(包括自己)组装成一个任务NotifySingleTask,对象里面有一个属性url; 组装url的代码,这个Url就是一会要请求的链接
private static final String URL_PATTERN = "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}"; this.url = MessageFormat.format(URL_PATTERN, target, RunningConfigUtils.getContextPath(), dataId, group);
最终组装成的url形式 http://ip:port/nacos/communication/dataChange?dataId={2}&group={3}&tenant={4}&tag=tag
3. 2中组装成的所有NotifySingleTask放到Queue队列中, 然后传到AsyncTask中; AsyncTask是一个http异步的任务; 它会http请求NotifySingleTask对象中的url
AsyncTask 异步任务
class AsyncTask implements Runnable { public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) { this.httpclient = httpclient; this.queue = queue; } @Override public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() { while (!queue.isEmpty()) { NotifySingleTask task = queue.poll(); String targetIp = task.getTargetIP(); if (serverListService.getServerList().contains( targetIp)) { // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知 if (serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) { // target ip 不健康,则放入通知列表中 ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task asyncTaskExecute(task); } else { HttpGet request = new HttpGet(task.url); request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP); if (task.isBeta) { request.setHeader("isBeta", "true"); } httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task)); } } } } private Queue<NotifySingleTask> queue; private CloseableHttpAsyncClient httpclient; }
这个任务的执行流程如下:
- 如果targetIp不在serverListService.getServerList() 列表中就忽略( 出现这个原因的情况可可能是远程配置文件移除了这台服务器)
- 如果远程配置文件服务器是健康的但是targetIp是不健康的,就发起重试流程
asyncTaskExecute(task);
这个重试任务会延迟执行 - 如果targetIp健康,则发起请求,请求链接就是之前组装的url ;
http://ip:port/nacos/communication/dataChange?dataId={2}&group={3}&tenant={4}&tag=tag
- 如果这个url请求失败,也会发起重试流程
asyncTaskExecute(task);
重试延迟执行时间
走入重试流程的话,会延迟一定的时间来执行;那么延迟多久呢?
/** * get delayTime and also set failCount to task;失败时间指数增加,以免断网场景不断重试无效任务,影响正常同步 * * @param task notify task * @return delay */ private static int getDelayTime(NotifySingleTask task) { int failCount = task.getFailCount(); int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS; if (failCount <= MAX_COUNT) { task.setFailCount(failCount + 1); } return delay; }
失败时间指数增加,以免断网场景不断重试无效任务,影响正常同步
通知配置信息改变
上面第3步骤说发起一个http请求url; 这个请求就是通知所有服务器有数据变更了(包括通知自己),这个通知的方法在CommunicationController.notifyConfigInfo最终执行的方法是DumpService的dump方法
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
关于DumpService可以看之前的文章 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中DumpService的dump方法其实也是new一个DumpTask任务放到任务执行类dumpTaskMgr里面等待执行,这个执行类是单线程操作;
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); }
这个dumpTaskMgr的执行器是哪个呢?
dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager",new DumpProcessor(this));
最终的执行器是 DumpProcessor 那么最终执行的是DumpProcessor的process方法;
DumpProcessor ,Dump指定的配置信息
在 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中里面有详细讲解执行器DumpAllProcessor 这个执行器是一次把所有的配置给Dump下来;而DumpProcessor是只dump指定的配置!
因为第四篇文章已经详细讲解了Dump流程,这里只简要说明一下
- 先数据库查询这个配置信息是否存在(因为有可能在其他机器上执行了删除操作),存在的话,将指定的配置文件Dump到服务器磁盘文件上; 如果没有查到就把本地的也删除并通知
- 更新服务器上配置数据的缓存
- 发送LocalDataChangeEvent事件,通知本地数据有变更;最终执行的是DataChangeTask任务;
DataChangeTask 数据变更任务
- 遍历所有的长轮询订阅者者(怎么订阅上的?后面会有文章介绍)
- 如果是beta发布且不在beta列表直接跳过
- 如果tag发布且不在tag列表直接跳过
- 发送Http响应(既然是响应,那什么时候请求的呢?后面介绍)通知所有未被上面2、3过滤掉的的订阅者最新的配置数据ConfigInfo
2总结
- 增删改数据库数据
- 请求所有的集群服务器列表的http请求
/communication/dataChange
- 每个服务器收到请求之后就更新本地磁盘文件
- 每个服务器更新本地缓存
- 每个服务器轮询订阅自己的客户端
- 如果是beta灰度发布,但是这个客户端不在灰度Ip列表里面则忽略
- 如果是tag发布切不在tag列表直接忽略
- 发送Http响应通知所有未被上面6、7过滤掉的的订阅者最新的配置数据ConfigInfo,这个数据只是dataId,group等等;没有返回content的内容
- 客户端收到响应之后可以调用服务端http请求 ConfigServletInner.doGetConfig方法来获取最新数据;
- ConfigServletInner.doGetConfig获取数据是获取服务器本地磁盘中的数据,是没有走数据库的;因为本地磁盘的文件就是最新的数据
3问题
读完这篇文章我们知道了, 增删改配置之后做了哪些事情,相信看完这篇文章之后会有新的问题;
- [ ] 客户端是如何订阅服务端的?
- [ ] 服务端又是怎么通知到客户端数据变更的?
- [ ] 客户端与服务端直接是长连接还是短连接?
- [ ] 客户端与服务端是推还是拉?
以上问题,我会再后面的文章中一一分析; 欢迎关注我的公众号 进击的老码农(jjdlmn)Nacos源码系列会再公众号中首发;每日会推一些科技资讯、Java、面试题 、源码系列;