Nacos服务心跳和健康检查源码介绍

简介: Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口/instance/beat发送心跳。

一、服务心跳


 Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口/instance/beat发送心跳。

客户端服务在注册服务的时候会增加一个心跳的任务,如下图所示:

1005447-20210203110146114-2097060231.png

首先看下BeatInfo这个类,重点看标注的字段,该字段是给周期任务设定时间,如下图:

1005447-20210203110403287-894301846.png

该方法内部定义的一个DEFAULT_HEART_BEAT_INTERVAL的常量,设定5秒:

1005447-20210203110527557-859130093.png

接下来我们看下addBeatInfo方法,该方法内部主要是将BeatTask任务加入到线程池ScheduledExecutorService当中,如下图:

1005447-20210203110732574-1495415226.png

重点部分就是看BeatTask,BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为/instance/beat的方法

1005447-20210203110917273-844386278.png

1005447-20210203111001657-634988841.png

接下来我们把目光放到服务端,找到InstanceController的beat方法,如果是参数beat信息的话,说明是第一次发起心跳,则会带有服务实例信息,因为发起心跳成功则服务端会返回下次不要带beat信息的参数,这样客户端第二次就不会携带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册,整体执行流程如下图:

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //设置心跳间隔
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    //判断有无心跳内容
    //如果存在心跳内容则不是轻量级心跳就转化为RsInfo
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    //获取实例的信息
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    //如果实例不存在
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }
        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        //根据您心跳内容创建一个实例信息
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
        //注册实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    //获取服务的信息
    Service service = serviceManager.getService(namespaceId, serviceName);
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    //不存在的话,要创建一个进行处理
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    //开启心跳检查任务
    service.processClientBeat(clientBeat);
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    //5秒间隔
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    //告诉客户端不需要带上心跳信息了,变成轻量级心跳了
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

接下来我们看一下processClientBeat方法,该方法将ClientBeatProcessor放入到线程池中,接下来我们看下重点看下run方法,

1005447-20210203111337939-780296120.png

该方法内部主要就是更新对应实例下心跳时间,整体上如下图:

1005447-20210203111511141-903787977.png

至此完成了从客户端到服务端更新实例的心跳时间,下图是整体的时序图:


服务的健1005447-20210203111725972-961856416.png

二、服务的健康检查


Nacos Server会开启一个定时任务来检查注册服务的健康情况,对于超过15秒没收到客户端的心跳实例会将它的 healthy属性置为false,此时当客户端不会将该实例的信息发现,如果某个服务的实例超过30秒没收到心跳,则剔除该实例,如果剔除的实例恢复,发送心跳则会恢复。

当有实例注册的时候,我们会看到有个service.init()的方法,该方法的实现主要是将ClientBeatCheckTask加入到线程池当中,如下图:

1005447-20210203113625578-2011349034.png

ClientBeatCheckTask中的run方法主要做两件事心跳时间超过15秒则设置该实例信息为不健康状况和心跳时间超过30秒则删除该实例信息,如下代码:

public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }
        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }
        //获取服务所有实例信息
        List<Instance> instances = service.allIPs(true);
        // first set health status of instances:
        for (Instance instance : instances) {
            //如果心跳时间超过15秒则设置该实例信息为不健康状况
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        // then remove obsolete instances:
        for (Instance instance : instances) {
            if (instance.isMarked()) {
                continue;
            }
            //如果心跳时间超过30秒则删除该实例信息
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }
}

首先我们来看一下deleteIp方法,该方法内部主要通过构建删除请求,发送删除请求,如下图:

1005447-20210203113847929-1505177188.png

删除实例的接口如下图:

1005447-20210203114100774-1998085533.png

内部通过调用ServiceManager的removeInstance方法,如下图:

1005447-20210203114232082-1392434867.png

重点看下substractIpAddresses内部通过调用updateIpAddresses,该方法内部主要就是移除到超过30秒的实例信息,如下图:

1005447-20210203114459723-1111599452.png

到此完成删除实例的过程,整体的时序图如下:

1005447-20210203114634106-2130560652.png

接下来我们看标记不健康时候的代码,这部分代码在客户端注册的时候也出现相同的代码,只是我们略过了,这部分也是观察者模式的重要体现,从这里我们可以学习到的东西在于结合Spring的事件机制,轻松实现观察者模式,当然这个里面也有部分我感觉写的不太好,哈哈,大佬们看到勿喷。

1005447-20210203114800616-680974236.png

首先我们看serviceChanged方法,该方法主要是发布一个服务不健康的事件,如下图:

1005447-20210203115054572-1717093596.png

接下来我们看下如何处理这个事件,这个时候涉及PushService这个类,整体的继承结构如下图:

1005447-20210203115203034-1847618815.png

我们看到该类的继承ApplicationListener接口,该接口是一个支持泛型的接口,传入了ServiceChangeEvent的类,此处就是对事件的处理,如下图:

1005447-20210203115322522-2142129798.png

接下来看一下onApplicationEvent方法,这个方法主要完成了准备数据,发送数据这几件事情:

public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();
    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            //获取所有需要推送的客户端
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }
            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                //超时的不删除跳过处理
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }
                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;
                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }
                //准备UDP数据
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }
                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));
                //发送数据
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
        } finally {
            //发送完成删除
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }
    }, 1000, TimeUnit.MILLISECONDS);
    //增加待推送的任务
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}

接下里我们重点看下udpPush的方法,整个方法主要是通过一个Map对象来记录UDP请求,如果没收到就重试发送请求,整体如下:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }
    //如果大于最大的尝试次数
    //移除发送的数据和待确认的key
    //失败推送的次数+1
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }
    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        //记录UDP请求的返回信息
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //发送UDP请求
        udpSocket.send(ackEntry.origin);
        ackEntry.increaseRetryTime();
        //如果UDP没收到返回信息 每10秒尝试一下
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return null;
    }
}

服务端有发送,那么客户端就有接收的,接收部分我理解上是服务发现部分,这里我们就不做过多介绍,待下一篇再来聊聊。

目录
打赏
0
0
0
0
28
分享
相关文章
阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。
在云原生时代,微服务架构成为主流。阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。示例代码展示了如何在项目中实现两者的整合,通过 Nacos 动态调整服务状态和配置,适应多变的业务需求。
78 2
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
99 1
"Nacos架构深度解析:一篇文章带你掌握业务层四大核心功能,服务注册、配置管理、元数据与健康检查一网打尽!"
【10月更文挑战第23天】Nacos 是一个用于服务注册发现和配置管理的平台,支持动态服务发现、配置管理、元数据管理和健康检查。其业务层包括服务注册与发现、配置管理、元数据管理和健康检查四大核心功能。通过示例代码展示了如何在业务层中使用Nacos,帮助开发者构建高可用、动态扩展的微服务生态系统。
145 0
"Nacos 2.1.0版本数据库配置写入难题破解攻略:一步步教你排查连接、权限和配置问题,重启服务轻松解决!"
【10月更文挑战第23天】在使用Nacos 2.1.0版本时,可能会遇到无法将配置信息写入数据库的问题。本文将引导你逐步解决这一问题,包括检查数据库连接、用户权限、Nacos配置文件,并提供示例代码和详细步骤。通过这些方法,你可以有效解决配置写入失败的问题。
150 0
|
8月前
|
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
497 0
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
inishConnect(..) failed: Connection refused,服务本地正常服务器网关报400,nacos服务实例不能下线
总之,这种问题需要通过多方面的检查和校验来定位和解决,并可能需要结合实际环境的具体情况来进行相应的调整。在处理分布式系统中这类问题时,耐心和细致的调试是必不可少的。
124 13
【技术难题破解】Nacos v2.2.3 + K8s 微服务注册:强制删除 Pod 却不消失?!7步排查法+实战代码,手把手教你解决Nacos Pod僵死问题,让服务瞬间满血复活!
【8月更文挑战第15天】Nacos作为微服务注册与配置中心受到欢迎,但有时会遇到“v2.2.3 k8s 微服务注册nacos强制删除 pod不消失”的问题。本文介绍此现象及其解决方法,帮助开发者确保服务稳定运行。首先需检查Pod状态与事件、配置文件及Nacos配置,确认无误后可调整Pod生命周期管理,并检查Kubernetes版本兼容性。若问题持续,考虑使用Finalizers、审查Nacos日志或借助Kubernetes诊断工具。必要时,可尝试手动强制删除Pod。通过系统排查,通常能有效解决此问题。
129 0
【Nacos】心跳断了怎么办?!8步排查法+实战代码,手把手教你解决Nacos客户端不发送心跳检测问题,让服务瞬间恢复活力!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心。然而,“客户端不发送心跳检测”的问题时有发生,可能导致服务实例被视为离线。本文介绍如何排查此类问题:确认Nacos服务器地址配置正确;检查网络连通性;查看客户端日志;确保Nacos SDK版本兼容;调整心跳检测策略;验证服务实例注册状态;必要时重启应用;检查影响行为的环境变量。通过这些步骤,通常可定位并解决问题,保障服务稳定运行。
347 0
【Nacos】神操作!节点提示暂时不可用?别急!7步排查法+实战代码,手把手教你解决Nacos服务实例状态异常,让服务瞬间满血复活!
【8月更文挑战第15天】Nacos作为微服务注册与配置中心,虽广受好评,但仍可能遇到“节点提示暂时不可用”的问题。本文解析此现象及其解决之道。首先需理解该提示意味着服务实例未能正常响应。解决步骤包括:检查服务状态与网络、审查Nacos配置、调整健康检查策略、重启服务及分析日志。通过系统化排查,可有效保障服务稳定运行。
233 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等