addTenantListeners
添加监听,这里主要是通过 dataId , group 来获取 cache 本地缓存的配置信息,然后再将 Listener 也传给 cache 统一管理。
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException { group = null2defaultGroup(group); String tenant = agent.getTenant(); CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); for (Listener listener : listeners) { cache.addListener(listener); } }
回调触发
如果 md5 值发生变化过后就会调用 safeNotifyListener
方法然后将配置信息发送给对应的监听器
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } }
服务端响应
当服务端收到请求后,会 hold 住当前请求,如果有变化就返回,如果没有变化就等待超时之前返回无变化。
/** * The client listens for configuration changes. */ @PostMapping("/listener") @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }
LongPollingService
核心处理类 LongPollingService
/** * Add LongPollingClient. * * @param req HttpServletRequest. * @param rsp HttpServletResponse. * @param clientMd5Map clientMd5Map. * @param probeRequestSize probeRequestSize. */ public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // Do nothing but set fix polling timeout. } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // Must be called by http thread, or send response. final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout() is incorrect, Control by oneself asyncContext.setTimeout(0L); ConfigExecutor.executeLongPolling( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
参考链接
blog.csdn.net/jason_jiaho…www.cnblogs.com/lockedsher/…