SpringCloud源码剖析-Eureka Server服务续约

简介: Eureka客户端定时向服务端发起服务续约的请求,也是通过ServeltContainer来接待请求,请求中携带了当前续约服务的状态和最后修改时间,ServeltContainer最终会调用 InstanceResource来处理请求,

前言

前一章节我们分析了一下Eureka Server实现服务注册的流程,这一章节我们把Eureka Server服务续约流程看了,请结合《Eureka Client服务续约》


Eureka Server 服务续约流程

Eureka客户端定时向服务端发起服务续约的请求,也是通过ServeltContainer来接待请求,请求中携带了当前续约服务的状态和最后修改时间,ServeltContainer最终会调用 InstanceResource来处理请求,源码如下

@Produces({"application/xml", "application/json"})
publicclassInstanceResource {
    ...省略...
/**来自客户端实例的续订租约的put请求* A put request for renewing lease from a client instance.** @param isReplication*          请求头isreplicated 是否是从其他节点复制的*            a header parameter containing information whether this is*            replicated from other nodes.* @param overriddenStatus*          覆盖状态*            overridden status if any.* @param status :续约的服务状态,一般都是UP*            the {@link InstanceStatus} of the instance.* @param lastDirtyTimestamp 此实例信息更新的最后时间戳*            last timestamp when this instance information was updated.指示操作是成功还是失败的响应。* @return response indicating whether the operation was a success or*         failure.*/@PUTpublicResponserenewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) StringisReplication,
@QueryParam("overriddenstatus") StringoverriddenStatus,
@QueryParam("status") Stringstatus,
@QueryParam("lastDirtyTimestamp") StringlastDirtyTimestamp) {
//是否是其他节点复制booleanisFromReplicaNode="true".equals(isReplication);
//执行续约流程booleanisSuccess=registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a registerif (!isSuccess) {
//没续约成功,返回NOT_FOUND状态logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
returnResponse.status(Status.NOT_FOUND).build();
        }
// 检查是否需要根据最后更新间戳进行同步,客户端实例可能已更改了某些值// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponseresponse=null;
if (lastDirtyTimestamp!=null&&serverConfig.shouldSyncWhenTimestampDiffers()) {
response=this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() ==Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus!=null)
&&!(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&&isFromReplicaNode) {
//修改服务端维护的注册表中注册的服务的覆盖的状态registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
response=Response.ok().build();
        }
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
returnresponse;
    }
}

这里方法中调用了InstanceRegistry#renew的续约方法,跟踪下去

@Overridepublicbooleanrenew(finalStringappName, finalStringserverId,
booleanisReplication) {
log("renew "+appName+" serverId "+serverId+", isReplication {}"+isReplication);
//获取服务端本地存储的服务注册表,//其实是从一个AbstractInstanceRegistry中名字叫 registry的map中获取//因为服务注册就是存储到这个registry的map中的List<Application>applications=getSortedApplications();
for (Applicationinput : applications) {
//找到当前续约服务名对应的服务if (input.getName().equals(appName)) {
InstanceInfoinstance=null;
for (InstanceInfoinfo : input.getInstances()) {
//找到服务名相同,服务id也相同的服务if (info.getId().equals(serverId)) {
instance=info;
break;
                }
            }
//抛出续约事件publishEvent(newEurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
        }
    }
//调用super的续约方法进行续约returnsuper.renew(appName, serverId, isReplication);
}

这里做了如下事情

  1. 找到服务端维护的服务注册表registry(Map)
  2. 根据续约参数中的服务名,以及服务Id,找到续约的服务
  3. 抛出续约事件
  4. 调用父类的续约逻辑

我们继续看它父类的续约方法PeerAwareInstanceRegistryImpl#renew

/** (non-Javadoc)** @see com.netflix.eureka.registry.InstanceRegistry#renew(java.lang.String,* java.lang.String, long, boolean)*/publicbooleanrenew(finalStringappName, finalStringid, finalbooleanisReplication) {
//调用super的续约方法if (super.renew(appName, id, isReplication)) {
//续约的实例信息复制到其他的Eureka节点//注意这里是传入的Action是Action.Heartbeat 心跳replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
returntrue;
    }
returnfalse;
}

这里做了两个事情

  • 1是调用super的续约方法
  • 2是续约成功后,调用replicateToPeers把续约成功的实例信息同步到其他的Eureka节点上,这个方法我们在分析注册流程的时候已经看过,不管是注册,续约,取消注册,状态改变等操作都要执行replicateToPeers进行Eureka集群节点之间的数据同步.

我们重点跟踪一下super.renew方法即:AbstractInstanceRegistry#renew

/*** Marks the given instance of the given app name as renewed, and also marks whether it originated from* replication.** @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)*///appName应用名,id实例ID,isReplication是否是复制publicbooleanrenew(StringappName, Stringid, booleanisReplication) {
//增加给定统计信息的计数器,EurekaMonitors.counter加一,用作Eureka监控RENEW.increment(isReplication);
//从服务端维护的服务实例注册表中获取到当前续约服务名对应的服务信息Map<String, Lease<InstanceInfo>>gMap=registry.get(appName);
Lease<InstanceInfo>leaseToRenew=null;
if (gMap!=null) {
//获取服务的租约对象leaseToRenew=gMap.get(id);
    }
//如果没找到服务,返回falseif (leaseToRenew==null) {
//续约服务没找到的计数器增加RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
returnfalse;
    } else {
//根据租约对象获取到服务的实例信息对象InstanceInfoInstanceInfoinstanceInfo=leaseToRenew.getHolder();
//如果服务实例信息不为空if (instanceInfo!=null) {
// touchASGCache(instanceInfo.getASGName());//获取服务的状态InstanceStatusoverriddenInstanceStatus=this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
//如果服务的状态是UNKNOWN,说明服务可能被删除了,需要重新注册,返回falseif (overriddenInstanceStatus==InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+"; re-register required", instanceInfo.getId());
//续约服务没找到的计数器增加RENEW_NOT_FOUND.increment(isReplication);
returnfalse;
            }
//实例状态与实例的OverriddenStatus覆盖实例状态不同。因此将状态设置为OverriddenStatus状态if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "+"Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
//修改状态为overriddenInstanceStatus,instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
//增加最后更新计数renewsLastMin.increment();
//执行续约,com.netflix.eureka.lease.Lease#renewleaseToRenew.renew();
//续约成功returntrue;
    }
}

这个方法又做了什么呢

  • 1是增加续约计数,Eureka监控用的,然后根据续约的服务名在本地注册表registry(ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry)获取服务的Lease租约对象,然后根据租约对象获取InstanceInfo服务注册实例
  • 2如果InstanceInfo为空,说明服务可能被删除,增加续约失败计数后,返回false
  • 3如果当前实例的状态和overriddenInstanceStatus状态不一致,就修改实例状态为overriddenInstanceStatus
  • 4增加最后更新计数,然后调用 Lease#renew方法修改lastUpdateTimestamp,即最后续约时间更新,最后续约时间更新了,Eureka Server就知道服务实例续约成功,否则到了最后续约失效时间服务端会考虑剔除服务。

当然,服务续约成功之后还会回到 replicateToPeers 方法中把续约的服务信息同步到其他的Eureka节点,我们回到replicateToPeers方法看一下

privatevoidreplicateToPeers(Actionaction, StringappName, Stringid,
InstanceInfoinfo/* optional */,
InstanceStatusnewStatus/* optional */, booleanisReplication) {
Stopwatchtracer=action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
        }
// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes==Collections.EMPTY_LIST||isReplication) {
return;
        }
//循环除开自己的所有Eureka节点都要同步for (finalPeerEurekaNodenode : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
            }
//复制实例信息到其他节点replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
tracer.stop();
    }
}
/*** Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/privatevoidreplicateInstanceActionsToPeers(Actionaction, StringappName,
Stringid, InstanceInfoinfo, InstanceStatusnewStatus,
PeerEurekaNodenode) {
try {
InstanceInfoinfoFromRegistry=null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
caseCancel:
node.cancel(appName, id);
break;
caseHeartbeat:
//续约心跳,获取服务的状态InstanceStatusoverriddenStatus=overriddenInstanceStatusMap.get(id);
//根据名字和id获取实例InstanceInfoinfoFromRegistry=getInstanceByAppAndId(appName, id, false);
//调用PeerEurekaNode.heartbeat方法node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
caseRegister:
node.register(info);
break;
caseStatusUpdate:
infoFromRegistry=getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
caseDeleteStatusOverride:
infoFromRegistry=getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
        }
    } catch (Throwablet) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

这里跟服务注册流程差不多,做了如下事情

  • 1找到所有的PeerEurekaNode 节点,然后循环每一个PeerEurekaNode 都要进行实例信息复制,当然自己不用复制,
  • 2.根据Action类型判断是什么动作(Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;),当前是Heartbeat
  • 3.调用PeerEurekaNode 的heartbeat方法同步心跳信息到Eureka 节点

PeerEurekaNode#heartbeat 源码如下

/**发送心跳信息到Eureka节点,如果服务没注册,先注册,然后再发送* Send the heartbeat information of an instance to the node represented by* this class. If the instance does not exist the node, the instance* registration information is sent again to the peer node.** @param appName 服务名*            the application name of the instance.* @param id  服务实例id*            the unique identifier of the instance.* @param info 实例信息对象*            the instance info {@link InstanceInfo} of the instance.* @param overriddenStatus 服务状态*            the overridden status information if any of the instance.* @throws Throwable*/publicvoidheartbeat(finalStringappName, finalStringid,
finalInstanceInfoinfo, finalInstanceStatusoverriddenStatus,
booleanprimeConnection) throwsThrowable {
if (primeConnection) {
//根据调用传入的参数,这里是false,不会执行下面的代码,除非是走Aws亚马逊才是true// We do not care about the result for priming request.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
    }
//创建复制任务ReplicationTaskreplicationTask=newInstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@OverridepublicEurekaHttpResponse<InstanceInfo>execute() throwsThrowable {
//执行心跳发送,参数:服务名,服务id,实例信息InstanceInfo ,服务状态returnreplicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }
@OverridepublicvoidhandleFailure(intstatusCode, ObjectresponseEntity) throwsThrowable {
//如果发送失败super.handleFailure(statusCode, responseEntity);
if (statusCode==404) {
logger.warn("{}: missing entry.", getTaskName());
if (info!=null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
//这里调用注册方法register(info);
                }
            } elseif (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfopeerInstanceInfo= (InstanceInfo) responseEntity;
if (peerInstanceInfo!=null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                }
            }
        }
    };
longexpiryTime=System.currentTimeMillis() +getLeaseRenewalOf(info);
//分批调度程序,执行任务batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}

PeerEurekaNode代表了Eureka节点,heartbeat方法做了如下事情

  • 1创建一个InstanceReplicationTask实例复制任务
  • 2.任务通过调用replicationClient.sendHeartBeat 发送心跳信息
  • 3.如果发送失败,出现404错误会先走注册流程
  • 4.调用分配调度器执行任务

replicationClient.sendHeartBeat其实调用的是JerseyReplicationClient.sendHeartBeat发送心跳

@OverridepublicEurekaHttpResponse<InstanceInfo>sendHeartBeat(StringappName, Stringid, InstanceInfoinfo, InstanceStatusoverriddenStatus) {
//发送地址StringurlPath="apps/"+appName+'/'+id;
ClientResponseresponse=null;
try {
//封装http参数WebResourcewebResource=jerseyClient.getClient().resource(serviceUrl)
            .path(urlPath)
//状态            .queryParam("status", info.getStatus().toString())
//最后更新时间            .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus!=null) {
webResource=webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
BuilderrequestBuilder=webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//发送put请求response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
InstanceInfoinfoFromPeer=null;
if (response.getStatus() ==Status.CONFLICT.getStatusCode() &&response.hasEntity()) {
infoFromPeer=response.getEntity(InstanceInfo.class);
        }
returnanEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build();
    } finally {
if (logger.isDebugEnabled()) {
logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response==null?"N/A" : response.getStatus());
        }
if (response!=null) {
response.close();
        }
    }
}

到这里就结束了,最后总结一下

总结

目录
相关文章
|
7天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
23 2
|
23天前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
1月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
103 1
|
13天前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
39 9
|
11天前
|
JSON Java 测试技术
SpringCloud2023实战之接口服务测试工具SpringBootTest
SpringBootTest同时集成了JUnit Jupiter、AssertJ、Hamcrest测试辅助库,使得更容易编写但愿测试代码。
42 3
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
111 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
Java Spring 容器
Spring IOC、AOP与事务管理底层原理及源码解析
【10月更文挑战第1天】Spring框架以其强大的控制反转(IOC)和面向切面编程(AOP)功能,成为Java企业级开发中的首选框架。本文将深入探讨Spring IOC和AOP的底层原理,并通过源码解析来揭示其实现机制。同时,我们还将探讨Spring事务管理的核心原理,并给出相应的源码示例。
128 9
|
1月前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
1月前
|
缓存 Java Spring
源码解读:Spring如何解决构造器注入的循环依赖?
本文详细探讨了Spring框架中的循环依赖问题,包括构造器注入和字段注入两种情况,并重点分析了构造器注入循环依赖的解决方案。文章通过具体示例展示了循环依赖的错误信息及常见场景,提出了三种解决方法:重构代码、使用字段依赖注入以及使用`@Lazy`注解。其中,`@Lazy`注解通过延迟初始化和动态代理机制有效解决了循环依赖问题。作者建议优先使用`@Lazy`注解,并提供了详细的源码解析和调试截图,帮助读者深入理解其实现机制。
36 1
下一篇
无影云桌面