Nacos配置中心之服务端长轮询处理机制

简介: Nacos配置中心之服务端长轮询处理机制

Nacos配置中心之服务端长轮询处理机制

接着上回说,客户端进行长轮询其实是使用定时线程来定时调用/v1/cs/configs/listener接口,那么服务端的这个接口实现逻辑又是怎样的呢?今天我们就看看这一块的内容。

客户端发起http请求进行长轮询,调用接口/listner 服务端在nacos-config模块中的ConfigController类中:

ConfigController的listener()方法

ConfigController的listener()方法:

/**
 * 比较MD5
 */
@PostMapping("/listener")
public void listener(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }

    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

    Map<String, String> clientMd5Map;
    try {
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }

    // do long-polling
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
  1. 获取客户端需要监听的可能发送变化的配置,计算MD5值
  2. inner.doPollingConfig执行长轮询请求

doPollingConfig()方法

/**
 * 轮询接口
 */
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                              Map<String, String> clientMd5Map, int probeRequestSize)
    throws IOException {

    // 长轮询
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }

    // else 兼容短轮询逻辑
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

    // 兼容短轮询result
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);

    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);

    /**
     * 2.0.4版本以前, 返回值放入header中
     */
    if (versionNum < START_LONGPOLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }

    // 禁用缓存
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}
  1. 判断当前请求是否为长轮询,如果是,调用LongPollingService的addLongPollingClient()方法
  2. 不是长轮询,就立即返回结果

LongPollingService的addLongPollingClient()

这个方法主要把客户端的长轮询请求封装成ClientLongPolling交给scheduler执行

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                 int probeRequestSize) {

    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    /**
     * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
     */
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // do nothing but set fix polling timeout
    } else {
        long start = System.currentTimeMillis();
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                clientMd5Map.size(), probeRequestSize, changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                changedGroups.size());
            return;
        }
    }
    String ip = RequestUtil.getRemoteIp(req);
    // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
    asyncContext.setTimeout(0L);

    scheduler.execute(
        new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
  1. 获取客户端请求的超时时间,减去500ms后赋值给timeout变量。
  2. 判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则在29.5s后开始执行
  3. 和服务端的数据进行MD5对比,如果发送变化则直接返回
  4. scheduler.execute 执行ClientLongPolling线程

ClientLongPolling

ClientLongPolling是一个线程,run方法如下:

@Override
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                /**
                 * 删除订阅关系
                 */
                allSubs.remove(ClientLongPolling.this);

                if (isFixedPolling()) {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                    List<String> changedGroups = MD5Util.compareMd5(
                        (HttpServletRequest)asyncContext.getRequest(),
                        (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                    if (changedGroups.size() > 0) {
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                    sendResponse(null);
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
            }

        }

    }, timeoutTime, TimeUnit.MILLISECONDS);

    allSubs.add(this);
}
  1. 通过scheduler.schedule启动一个定时任务,并延时时间为29.5s
  2. 将ClientLongPolling实例本身添加到allSubs队列中,它主要维护一个长轮询的订阅关系。
  3. 定时任务执行后,先把ClientLongPolling实例本身从allSubs队列中移除。
  4. 通过MD5比较客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端

所谓长轮询就是服务端收到请求之后,不立即返回,而是在延29.5s才把请求结果返回给客户端,这使得客户端和服务端之间在30s之内数据没有发生变化的情况下一直处于连接状态。

总结

通过对服务端/v1/cs/configs/listener接口的分析,我们知道这个接口处理客户端的长轮询,使用定时线程,29.5s之后查看是否有信息变更,有的话再返回给客户端信息,保证30s内有数据变化内察觉到。

相关文章
|
27天前
|
负载均衡 Java Nacos
SpringCloud基础2——Nacos配置、Feign、Gateway
nacos配置管理、Feign远程调用、Gateway服务网关
SpringCloud基础2——Nacos配置、Feign、Gateway
|
2月前
|
安全 Nacos 数据安全/隐私保护
升级指南:从Nacos 1.3.0 到 2.3.0,并兼容 Seata 的鉴权配置
本文详细介绍了如何在微服务环境下从 Nacos 1.3.0 升级到 2.3.0,并确保 Seata 各版本的兼容性。作者小米分享了升级过程中的关键步骤,包括备份配置、更新鉴权信息及验证测试等,并解答了常见问题。通过这些步骤,可以帮助读者顺利完成升级并提高系统的安全性与一致性。
88 8
升级指南:从Nacos 1.3.0 到 2.3.0,并兼容 Seata 的鉴权配置
|
2月前
|
运维 Java Nacos
Spring Cloud应用框架:Nacos作为服务注册中心和配置中心
Spring Cloud应用框架:Nacos作为服务注册中心和配置中心
|
2月前
|
应用服务中间件 Nacos 数据库
Nacos 1.2.1 集群搭建(三) Nginx 配置 集群
Nacos 1.2.1 集群搭建(三) Nginx 配置 集群
53 1
|
2月前
|
缓存 Cloud Native Java
【紧急救援】Nacos配置上线后失效?手把手教你如何轻松搞定命名空间修改难题!
【8月更文挑战第15天】Nacos是关键的云原生服务管理平台,用于动态服务发现与配置管理。但在使用其管理微服务配置时,可能会遇到命名空间内的配置更新后不生效的问题。本文探讨此问题并提供解决方案。首先需确认Nacos服务器运行正常及客户端正确连接。接着检查客户端缓存配置,可通过禁用缓存或缩短缓存间隔来即时更新配置。例如,在Spring Cloud Alibaba Nacos配置中心中启用自动刷新功能,并设置每5秒拉取新配置。同时,对于新增配置项,需重启客户端应用。还需检查Nacos服务器日志排除异常,并考虑升级Nacos版本解决兼容性问题。通过这些步骤,通常可有效解决配置不生效的难题。
77 0
|
2月前
|
安全 Nacos 数据库
【技术安全大揭秘】Nacos暴露公网后被非法访问?!6大安全加固秘籍,手把手教你如何保护数据库免遭恶意篡改,打造坚不可摧的微服务注册与配置中心!从限制公网访问到启用访问控制,全方位解析如何构建安全防护体系,让您从此告别数据安全风险!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心,但其公网暴露可能引发数据库被非法访问甚至篡改的安全隐患。本文剖析此问题并提供解决方案,包括限制公网访问、启用HTTPS、加强数据库安全、配置访问控制及监控等,帮助开发者确保服务安全稳定运行。
131 0
|
2月前
|
安全 Nacos 数据安全/隐私保护
【技术干货】破解Nacos安全隐患:连接用户名与密码明文传输!掌握HTTPS、JWT与OAuth2.0加密秘籍,打造坚不可摧的微服务注册与配置中心!从原理到实践,全方位解析如何构建安全防护体系,让您从此告别数据泄露风险!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心,但其连接用户名和密码的明文传输成为安全隐患。本文探讨加密策略提升安全性。首先介绍明文传输风险,随后对比三种加密方案:HTTPS简化数据保护;JWT令牌减少凭证传输,适配分布式环境;OAuth2.0增强安全,支持多授权模式。每种方案各有千秋,开发者需根据具体需求选择最佳实践,确保服务安全稳定运行。
123 0
|
3月前
|
Java Nacos 数据库
使用 nacos 搭建注册中心及配置中心
使用 nacos 搭建注册中心及配置中心
82 5
|
3月前
|
NoSQL Java Nacos
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
83 3
|
2月前
|
SQL 关系型数据库 MySQL
Nacos 1.2.1 集群搭建(二)MySQL、cluster 配置
Nacos 1.2.1 集群搭建(二)MySQL、cluster 配置
53 1

热门文章

最新文章