十.SpringCloud源码剖析-Eureka Server服务续约

简介: 前一章节我们分析了一下Eureka Server实现服务注册的流程,这一章节我们把Eureka Server服务续约流程看了,请结合《[Eureka Client服务续约](https://blog.csdn.net/u014494148/article/details/108315405)》

系列文章目录

一.SpringCloud源码剖析-Eureka核心API

二.SpringCloud源码剖析-Eureka Client 初始化过程

三.SpringCloud源码剖析-Eureka服务注册

四.SpringCloud源码剖析-Eureka服务发现

五.SpringCloud源码剖析-Eureka Client服务续约

六.SpringCloud源码剖析-Eureka Client取消注册

七.SpringCloud源码剖析-Eureka Server的自动配置

八.SpringCloud源码剖析-Eureka Server初始化流程

九.SpringCloud源码剖析-Eureka Server服务注册流程

十.SpringCloud源码剖析-Eureka Server服务续约

十一.SpringCloud源码剖析-Eureka Server服务注册表拉取

十二.SpringCloud源码剖析-Eureka Server服务剔除

十三.SpringCloud源码剖析-Eureka Server服务下线


### 前言 前一章节我们分析了一下Eureka Server实现服务注册的流程,这一章节我们把Eureka Server服务续约流程看了,请结合《[Eureka Client服务续约](https://blog.csdn.net/u014494148/article/details/108315405)》

Eureka Server 服务续约流程

Eureka客户端定时向服务端发起服务续约的请求,也是通过ServeltContainer来接待请求,请求中携带了当前续约服务的状态和最后修改时间,ServeltContainer最终会调用 InstanceResource来处理请求,源码如下

@Produces({
   
   "application/xml", "application/json"})
public class InstanceResource {
   
   
    ...省略...
     /**
       来自客户端实例的续订租约的put请求
     * A put request for renewing lease from a client instance.
     *
     * @param isReplication
     *           请求头isreplicated 是否是从其他节点复制的
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     * @param overriddenStatus
     *             覆盖状态
     *            overridden status if any.
     * @param status :续约的服务状态,一般都是UP
     *            the {@link InstanceStatus} of the instance.
     * @param lastDirtyTimestamp 此实例信息更新的最后时间戳
     *            last timestamp when this instance information was updated.
            指示操作是成功还是失败的响应。
     * @return response indicating whether the operation was a success or
     *         failure.
     */
    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
   
   
            //是否是其他节点复制
        boolean isFromReplicaNode = "true".equals(isReplication);
        //执行续约流程
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
   
   
            //没续约成功,返回NOT_FOUND状态
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
       // 检查是否需要根据最后更新间戳进行同步,客户端实例可能已更改了某些值
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
   
   
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
   
   
                    //修改服务端维护的注册表中注册的服务的覆盖的状态
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
   
   
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }
}

这里方法中调用了InstanceRegistry#renew的续约方法,跟踪下去

@Override
    public boolean renew(final String appName, final String serverId,
            boolean isReplication) {
   
   
        log("renew " + appName + " serverId " + serverId + ", isReplication {}"
                + isReplication);
        //获取服务端本地存储的服务注册表,
        //其实是从一个AbstractInstanceRegistry中名字叫 registry的map中获取
        //因为服务注册就是存储到这个registry的map中的
        List<Application> applications = getSortedApplications();
        for (Application input : applications) {
   
   
            //找到当前续约服务名对应的服务
            if (input.getName().equals(appName)) {
   
   
                InstanceInfo instance = null;
                for (InstanceInfo info : input.getInstances()) {
   
   
                    //找到服务名相同,服务id也相同的服务
                    if (info.getId().equals(serverId)) {
   
   
                        instance = info;
                        break;
                    }
                }
                //抛出续约事件
                publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                        instance, isReplication));
                break;
            }
        }
        //调用super的续约方法进行续约
        return super.renew(appName, serverId, isReplication);
    }

这里做了如下事情

  1. 找到服务端维护的服务注册表registry(Map)
  2. 根据续约参数中的服务名,以及服务Id,找到续约的服务
  3. 抛出续约事件
  4. 调用父类的续约逻辑

我们继续看它父类的续约方法PeerAwareInstanceRegistryImpl#renew

  /*
     * (non-Javadoc)
     *
     * @see com.netflix.eureka.registry.InstanceRegistry#renew(java.lang.String,
     * java.lang.String, long, boolean)
     */
    public boolean renew(final String appName, final String id, final boolean isReplication) {
   
   
        //调用super的续约方法
        if (super.renew(appName, id, isReplication)) {
   
   
            //续约的实例信息复制到其他的Eureka节点
            //注意这里是传入的Action是Action.Heartbeat 心跳
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

这里做了两个事情

  • 1是调用super的续约方法
  • 2是续约成功后,调用replicateToPeers把续约成功的实例信息同步到其他的Eureka节点上,这个方法我们在分析注册流程的时候已经看过,不管是注册,续约,取消注册,状态改变等操作都要执行replicateToPeers进行Eureka集群节点之间的数据同步.

我们重点跟踪一下super.renew方法即:AbstractInstanceRegistry#renew

/**
     * Marks the given instance of the given app name as renewed, and also marks whether it originated from
     * replication.
     *
     * @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
     */
     //appName应用名,id实例ID,isReplication是否是复制
    public boolean renew(String appName, String id, boolean isReplication) {
   
   
        //增加给定统计信息的计数器,EurekaMonitors.counter加一,用作Eureka监控
        RENEW.increment(isReplication);
        //从服务端维护的服务实例注册表中获取到当前续约服务名对应的服务信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
   
   
            //获取服务的租约对象
            leaseToRenew = gMap.get(id);
        }
        //如果没找到服务,返回false
        if (leaseToRenew == null) {
   
   
            //续约服务没找到的计数器增加
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
   
   
            //根据租约对象获取到服务的实例信息对象InstanceInfo
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            //如果服务实例信息不为空
            if (instanceInfo != null) {
   
   
                // touchASGCache(instanceInfo.getASGName());
                //获取服务的状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                        //如果服务的状态是UNKNOWN,说明服务可能被删除了,需要重新注册,返回false
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
   
   
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                     //续约服务没找到的计数器增加
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                //实例状态与实例的OverriddenStatus覆盖实例状态不同。因此将状态设置为OverriddenStatus状态
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
   
   
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    //修改状态为overriddenInstanceStatus,
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            //增加最后更新计数
            renewsLastMin.increment();
            //执行续约,com.netflix.eureka.lease.Lease#renew
            leaseToRenew.renew();
            //续约成功
            return true;
        }
    }

这个方法又做了什么呢

  • 1是增加续约计数,Eureka监控用的,然后根据续约的服务名在本地注册表registry(ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry)获取服务的Lease租约对象,然后根据租约对象获取InstanceInfo服务注册实例
  • 2如果InstanceInfo为空,说明服务可能被删除,增加续约失败计数后,返回false
  • 3如果当前实例的状态和overriddenInstanceStatus状态不一致,就修改实例状态为overriddenInstanceStatus
  • 4增加最后更新计数,然后调用 Lease#renew方法修改lastUpdateTimestamp,即最后续约时间更新,最后续约时间更新了,Eureka Server就知道服务实例续约成功,否则到了最后续约失效时间服务端会考虑剔除服务。

当然,服务续约成功之后还会回到 replicateToPeers 方法中把续约的服务信息同步到其他的Eureka节点,我们回到replicateToPeers方法看一下

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
   
   
        Stopwatch tracer = action.getTimer().start();
        try {
   
   
            if (isReplication) {
   
   
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
   
   
                return;
            }
            //循环除开自己的所有Eureka节点都要同步
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
   
   
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
   
   
                    continue;
                }
                //复制实例信息到其他节点
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
   
   
            tracer.stop();
        }
    }

    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
   
   
        try {
   
   
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
   
   
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    //续约心跳,获取服务的状态
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    //根据名字和id获取实例InstanceInfo
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    //调用PeerEurekaNode.heartbeat方法
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
   
   
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

这里跟服务注册流程差不多,做了如下事情

  • 1找到所有的PeerEurekaNode 节点,然后循环每一个PeerEurekaNode 都要进行实例信息复制,当然自己不用复制,
  • 2.根据Action类型判断是什么动作(Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;),当前是Heartbeat
  • 3.调用PeerEurekaNode 的heartbeat方法同步心跳信息到Eureka 节点

PeerEurekaNode#heartbeat 源码如下

 /**
     发送心跳信息到Eureka节点,如果服务没注册,先注册,然后再发送
     * Send the heartbeat information of an instance to the node represented by
     * this class. If the instance does not exist the node, the instance
     * registration information is sent again to the peer node.
     *
     * @param appName 服务名
     *            the application name of the instance.
     * @param id  服务实例id
     *            the unique identifier of the instance.
     * @param info 实例信息对象
     *            the instance info {@link InstanceInfo} of the instance.
     * @param overriddenStatus 服务状态
     *            the overridden status information if any of the instance.
     * @throws Throwable
     */
    public void heartbeat(final String appName, final String id,
                          final InstanceInfo info, final InstanceStatus overriddenStatus,
                          boolean primeConnection) throws Throwable {
   
   

        if (primeConnection) {
   
   
         //根据调用传入的参数,这里是false,不会执行下面的代码,除非是走Aws亚马逊才是true
            // We do not care about the result for priming request.
            replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            return;
        }
        //创建复制任务
        ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
   
   
            @Override
            public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
   
   
                //执行心跳发送,参数:服务名,服务id,实例信息InstanceInfo ,服务状态
                return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            }

            @Override
            public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
   
   
                //如果发送失败
                super.handleFailure(statusCode, responseEntity);
                if (statusCode == 404) {
   
   
                    logger.warn("{}: missing entry.", getTaskName());
                    if (info != null) {
   
   
                        logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                                getTaskName(), info.getId(), info.getStatus());
                        //这里调用注册方法
                        register(info);
                    }
                } else if (config.shouldSyncWhenTimestampDiffers()) {
   
   
                    InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                    if (peerInstanceInfo != null) {
   
   
                        syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                    }
                }
            }
        };
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        //分批调度程序,执行任务
        batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
    }

PeerEurekaNode代表了Eureka节点,heartbeat方法做了如下事情

  • 1创建一个InstanceReplicationTask实例复制任务
  • 2.任务通过调用replicationClient.sendHeartBeat 发送心跳信息
  • 3.如果发送失败,出现404错误会先走注册流程
  • 4.调用分配调度器执行任务

replicationClient.sendHeartBeat其实调用的是JerseyReplicationClient.sendHeartBeat发送心跳

 @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
   
   
        //发送地址
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
   
   
            //封装http参数
            WebResource webResource = jerseyClient.getClient().resource(serviceUrl)
                    .path(urlPath)
                    //状态
                    .queryParam("status", info.getStatus().toString())
                    //最后更新时间
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
   
   
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            //发送put请求
            response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
            InstanceInfo infoFromPeer = null;
            if (response.getStatus() == Status.CONFLICT.getStatusCode() && response.hasEntity()) {
   
   
                infoFromPeer = response.getEntity(InstanceInfo.class);
            }
            return anEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build();
        } finally {
   
   
            if (logger.isDebugEnabled()) {
   
   
                logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
   
   
                response.close();
            }
        }
    }

到这里就结束了,最后总结一下

总结

在这里插入图片描述

相关文章
|
4天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
16 2
|
20天前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
1月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
101 1
|
10天前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
36 9
|
7天前
|
JSON Java 测试技术
SpringCloud2023实战之接口服务测试工具SpringBootTest
SpringBootTest同时集成了JUnit Jupiter、AssertJ、Hamcrest测试辅助库,使得更容易编写但愿测试代码。
37 3
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
110 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
Java Spring 容器
Spring IOC、AOP与事务管理底层原理及源码解析
【10月更文挑战第1天】Spring框架以其强大的控制反转(IOC)和面向切面编程(AOP)功能,成为Java企业级开发中的首选框架。本文将深入探讨Spring IOC和AOP的底层原理,并通过源码解析来揭示其实现机制。同时,我们还将探讨Spring事务管理的核心原理,并给出相应的源码示例。
127 9
|
1月前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
1月前
|
缓存 Java Spring
源码解读:Spring如何解决构造器注入的循环依赖?
本文详细探讨了Spring框架中的循环依赖问题,包括构造器注入和字段注入两种情况,并重点分析了构造器注入循环依赖的解决方案。文章通过具体示例展示了循环依赖的错误信息及常见场景,提出了三种解决方法:重构代码、使用字段依赖注入以及使用`@Lazy`注解。其中,`@Lazy`注解通过延迟初始化和动态代理机制有效解决了循环依赖问题。作者建议优先使用`@Lazy`注解,并提供了详细的源码解析和调试截图,帮助读者深入理解其实现机制。
30 1