前言
上一篇 【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据 介绍了客户端会像服务端发起长轮询来获取变更数据, 其实在客户端发起长轮询的请求相当于向服务端发起了一个订阅; 因为服务端接受到客户端的请求之后如果没有查询到变更数据是不会返回的;而是会等待29.5s(当然时间可配),在这个29.5s时间内,服务端如果检测到有数据变更,会立马像客户端发起响应请求,因为这个时间内服务端还是有hold住客户端发过来的请求,所以能发回响应数据; hold住request是用的AsyncContext异步
[x] 服务端是怎么通知到客户端数据变更的
[x] 如何以 拉模式 长轮询服务端
LongPollingService
LongPollingService 是一个长轮询服务,但是它是处理客户端的长轮询;LongPollingService 还处理服务端本地数据变更之后的事情
服务端数据变更事件
LongPollingService实现了AbstractEventListener的onEvent方法; 这是一个发布订阅模式; 可以看 【Nacos源码之配置管理 二】Nacos中的事件发布与订阅--观察者模式;
Nacos源码之配置管理 七】服务端增删改配置数据之后如何通知集群中的其他机器 中有介绍修改数据之后的一些流程,就有讲到这里;这个是一个本地数据变更事件 ;DataChangeTask的时候留下了2个问号,前面挖的坑现在是填的时候了;
DataChangeTask
DataChangeTask 服务端数据变更任务
配置数据有变更的时候执行这个方法; 遍历allSubs.iterator();得到对象 ClientLongPolling ;这是一个客户端长轮询的对象;里面保存了一些例如ip、clientMd5Map、asyncContext等等还有其他一些数据;
asyncContext :Servlet 3.0新增了异步处理, 这个对象持有客户端的 request和 response; 就是通过这个对象,在服务端有了数据变更的情况下,能够里面的将变更数据返回响应给客户端; AsyncContext异步请求的用法: https://shirenchuang.blog.csdn.net/article/details/100809937
那么就剩下一个很重要的问题就是 allSubs 是什么时候订阅上的?
客户端发起长轮询
上一篇文章 【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据 分析了客户端发起长轮询的请求;如下
那么看看服务端这个listener
做了什么
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP线程调用,否则离开后容器会立即发送响应 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制 asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
获取客户端的请求参数;str: 长轮询的超时时间,默认30s;详细可见上一篇文章noHangUpFlag:不挂起标识,这个标识为false的时候,会把客户端的请求挂起;等待超时或者数据变更通知;如果客户端监听的数据是首次初始化,这个标识为true;delayTime:延时时间;为了避免客户端请求超时,需要提前这个时间返回响应;这个数据是在配置管理中配置的,默认500毫秒,详细见 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中
对比客户端和服务端MD5是否相同,有不同则直接返回不同的dataid+group响应;
如果2中没有不同,并且如果noHangUpFlag=true 则直接返回,不挂起请求;
2和3都不满足,则使用AsyncContext,将请求异步化,直接挂起; 超时时间为str-delayTIme,str是客户端设置的时间如下所示,delayTime默认500毫秒,可以在管理后台配置(见上面具体如何配置),如果都不主动配置,那么超时时间是30000-500=29500; 29.5秒;
执行ClientLongPolling任务
ClientLongPolling任务
这个类有如下属性
asyncContext中持有客户端的请求;clientMd5Map包含了客户端所要监听的数据的MD5;
ClientLongPolling这个任务类执行的时候,是把 一个任务延迟了timeoutTime之后再执行的,并且返回asyncTimeoutFuture,这个timeoutTime就是上面说到的超时时间,例如29.5s;allSubs中; 等待有数据变更的时候,可以通知到这个客户端,因为当前实例有asyncContext ,可以相应客户端的请求;
29.5s之后做了什么事情
看看timeoutTime之后执行的方法体做了什么
isFixedPolling()下面再讲,暂时忽略;我们看到最终执行的是
- 删除订阅关系,为啥要删除,因为这个
allSubs
在配置数据有变更的时候会遍历这个来进行通知,这里相当于本次请求要结束了,所以删除,不让通知了 - 执行
sendResponse(null);
方法;
timeoutTime超时时间到了之后,服务端会直接结束本次请求;然后客户端又会立马重新发起新一轮请求,重复这个过程;相当于是说客户端每隔timeoutTime时间之后,就发起一次请求判断服务端是否有变更数据;
那么问题来了,如果只是这样的话,那么就是服务端纯粹的使用 拉模式, 并没有服务端的推模式呀?
服务端变更数据使用推模式推送数据
还记得文章一开头就说到一个事件吗,LocalDataChangeEvent 事件,服务端中修改了配置数据之后,就通知这个事件,这个事件最终会执行DataChangeTask任务;
/**下面删除了部分代码;保留关键点**/ class DataChangeTask implements Runnable { @Override public void run() { try { for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { iter.remove(); // 删除订阅关系 clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { } }
遍历的allSubs; 这个allSubs是上面介绍过,客户端发起长轮询的请求的时候注册上的;
比较客户端订阅的配置数据MD5与当前是否一致,这个时候基本是不一致的,因为有修改嘛;
做了一些过滤操作之后,sendResponse(Arrays.asList(groupKey));配置项;发送了之后,本次请求也就结束了;客户端又会重新再发起新的一轮请求;
客户端拉+服务端推
上面分析完了之后,我们总结一下;
客户端发起订阅请求;
服务端接收到请求之后,立马去查询一次数据是否变更;hold住一定的时间(默认29.5s)
①.如果这期间客户端所监听的数据都一直没有变更,在时间到达之后,结束客户端的本次请求;客户端又回到步骤1;
就是这样一个不停的轮询的过程; 但是注意,服务端返回的只是哪些配置项有变更(只返回dataid+group等等,并没有返回content),客户端拿到这些变更配置项之后,还有主动请求配置项的content,来更新自己的缓存
isFixedPolling 固定长轮询
上面在介绍的时候,我们选择性忽略了这个固定长轮询;现在来介绍一下拉
如何设置固定长轮询
新增一个元数据配置项, DataId是 com.alibaba.nacos.meta.switch ; Group是DEFAULT_GROUP ;注意这个配置是内置的,一定要这样配置;
将配置isFixedPolling=true 打开固定长轮询
fixedPollingInertval=10000;固定长轮询的间隔时间
fixedDelayTime=500 延迟时间; 例如间隔时间是10s,延迟时间是0.5秒; 那么每隔9.5s执行一次轮询;0.5s的时间是为了防止请求超时的;
两种模式的比较
拉+推 的模式具有时效性;推荐使用 拉+推模式
总结