04篇 Nacos Client服务订阅机制之【核心流程】

简介: 04篇 Nacos Client服务订阅机制之【核心流程】

学习不用那么功利,二师兄带你从更高维度轻松阅读源码~

说起Nacos的服务订阅机制,对此不了解的朋友,可能感觉非常神秘,这篇文章就大家深入浅出的了解一下Nacos 2.0客户端的订阅实现。由于涉及到的内容比较多,就分几篇来讲,本篇为第一篇。

Nacos订阅概述

Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理。该更新实例的更新实例,该更新本地缓存的更新本地缓存。image.png上图画出了订阅方法的主线流程,涉及的内容较多,处理细节复杂。这里只用把握住核心部分即可。下面就通过代码和流程图来逐步分析上述过程。

从订阅到定时任务开启

我们这里聊的订阅机制,其实本质上就是服务发现的准实时感知。上面已经看到了当执行订阅方法时,会触发定时任务,定时去拉服务器端的数据。所以,本质上,订阅机制就是实现服务发现的一种方式,对照的方式就是直接查询接口了。

NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

// NacosNamingService
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

方法中的事件监听我们暂时不聊,直接看subscribe方法,这里clientProxy类型为NamingClientProxyDelegate。实例化NacosNamingService时该类被实例化,前面章节中已经讲到,不再赘述。

而clientProxy.subscribe方法在NamingClientProxyDelegate中实现:

// NamingClientProxyDelegate
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 获取缓存中的ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        // 如果为null,则进行订阅逻辑处理,基于gRPC协议
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 定时调度UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // ServiceInfo本地缓存处理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

这段方法是不是眼熟啊?对的,在前面分析《Nacos Client服务发现》时我们已经讲过了。看来殊途同归,查询服务列表和订阅最终都调用了同一个方法。

上篇讲了其他流程,我们这里重点看任务调度:

// ServiceInfoUpdateService
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        // 构建UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

该方法包含了构建serviceKey、通过serviceKey判重,最后添加UpdateTask。

而其中的addTask的实现就是发起了一个定时任务:

// ServiceInfoUpdateService
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

定时任务延时1秒执行。

跟踪到这里就告一阶段了。核心功能只有两个:调用订阅方法和发起定时任务。

定时任务都干了啥

UpdateTask封装了订阅机制的核心业务逻辑,先来通过一张流程图看一下都做了啥。image.png有了上述流程图,基本就很清晰的了解UpdateTask所做的事情了。直接贴出run方法的所有代码:

public void run() {
    long delayTime = DEFAULT_DELAY;
    try {
        // 判断该注册的Service是否被订阅,如果没有订阅则不再执行
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
            NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
            return;
        }
        // 获取缓存的service信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            // 根据serviceName从注册中心服务端获取Service信息
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }
        // 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 处理Service消息
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        // 刷新更新时间
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // 下次更新缓存时间设置,默认为6秒
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失败数量为0
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
    } finally {
        // 下次调度刷新时间,下次执行的时间与failCount有关
        // failCount=0,则下次调度时间为6秒,最长为1分钟
        // 即当无异常情况下缓存实例的刷新时间是6秒
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

首先在判断服务是否是被订阅过,实现方法是ChangeNotifier#isSubscribed:

public boolean isSubscribed(String groupName, String serviceName, String clusters) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    return CollectionUtils.isNotEmpty(eventListeners);
}

查看该方法的源码会发现,这里的listenerMap正是最开始的subscribe方法中registerListener注册的EventListener。

run方法后面的业务处理基本上都雷同了,先判断缓存是否有ServiceInfo信息,如果没有则查询注册中心、处理ServiceInfo、更新上次处理时间。

而下面判断ServiceInfo是否失效,正是通过“上次更新时间”与当前ServiceInfo中的“上次更新时间”做比较来判断。如果失效,也会查询注册中心、处理ServiceInfo、更新上次处理时间等一系列操作。

业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

小结

这一篇我们讲了Nacos客户端服务订阅机制的源码,主要有以下步骤:

第一步:订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;

第二步:通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;

第三步:通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;

第四步:UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。

第五步:重新计算定时任务时间,循环执行上述流程。

下一篇,我们会在此基础上继续讲解ServiceInfoHolder#processServiceInfo方法中是如何对获取到的实例信息进行处理的。


目录
相关文章
|
4月前
|
缓存 安全 Nacos
nacos常见问题之服务一直在报token expired!如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
430 0
|
4月前
|
网络协议 Java Nacos
nacos常见问题之在web界面 上下线服务时报错 400如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
|
4月前
|
缓存 PHP Nacos
nacos常见问题之服务升级后nacos控制台看到都是不可用重启nacos后恢复如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
319 4
|
4月前
|
存储 Nacos 数据安全/隐私保护
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
66 1
|
1月前
|
负载均衡 监控 Java
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
|
4月前
|
安全 Linux Nacos
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
239 0
|
2月前
|
监控 安全 网络安全
inishConnect(..) failed: Connection refused,服务本地正常服务器网关报400,nacos服务实例不能下线
总之,这种问题需要通过多方面的检查和校验来定位和解决,并可能需要结合实际环境的具体情况来进行相应的调整。在处理分布式系统中这类问题时,耐心和细致的调试是必不可少的。
63 13
|
1月前
|
Kubernetes Nacos 微服务
【技术难题破解】Nacos v2.2.3 + K8s 微服务注册:强制删除 Pod 却不消失?!7步排查法+实战代码,手把手教你解决Nacos Pod僵死问题,让服务瞬间满血复活!
【8月更文挑战第15天】Nacos作为微服务注册与配置中心受到欢迎,但有时会遇到“v2.2.3 k8s 微服务注册nacos强制删除 pod不消失”的问题。本文介绍此现象及其解决方法,帮助开发者确保服务稳定运行。首先需检查Pod状态与事件、配置文件及Nacos配置,确认无误后可调整Pod生命周期管理,并检查Kubernetes版本兼容性。若问题持续,考虑使用Finalizers、审查Nacos日志或借助Kubernetes诊断工具。必要时,可尝试手动强制删除Pod。通过系统排查,通常能有效解决此问题。
41 0
|
1月前
|
Java Nacos 开发工具
【Nacos】心跳断了怎么办?!8步排查法+实战代码,手把手教你解决Nacos客户端不发送心跳检测问题,让服务瞬间恢复活力!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心。然而,“客户端不发送心跳检测”的问题时有发生,可能导致服务实例被视为离线。本文介绍如何排查此类问题:确认Nacos服务器地址配置正确;检查网络连通性;查看客户端日志;确保Nacos SDK版本兼容;调整心跳检测策略;验证服务实例注册状态;必要时重启应用;检查影响行为的环境变量。通过这些步骤,通常可定位并解决问题,保障服务稳定运行。
52 0
|
1月前
|
网络安全 Nacos 开发者
【Nacos】神操作!节点提示暂时不可用?别急!7步排查法+实战代码,手把手教你解决Nacos服务实例状态异常,让服务瞬间满血复活!
【8月更文挑战第15天】Nacos作为微服务注册与配置中心,虽广受好评,但仍可能遇到“节点提示暂时不可用”的问题。本文解析此现象及其解决之道。首先需理解该提示意味着服务实例未能正常响应。解决步骤包括:检查服务状态与网络、审查Nacos配置、调整健康检查策略、重启服务及分析日志。通过系统化排查,可有效保障服务稳定运行。
36 0