SpringCloud源码剖析-Eureka Server服务续约

简介: Eureka客户端定时向服务端发起服务续约的请求,也是通过ServeltContainer来接待请求,请求中携带了当前续约服务的状态和最后修改时间,ServeltContainer最终会调用 InstanceResource来处理请求,

前言

前一章节我们分析了一下Eureka Server实现服务注册的流程,这一章节我们把Eureka Server服务续约流程看了,请结合《Eureka Client服务续约》


Eureka Server 服务续约流程

Eureka客户端定时向服务端发起服务续约的请求,也是通过ServeltContainer来接待请求,请求中携带了当前续约服务的状态和最后修改时间,ServeltContainer最终会调用 InstanceResource来处理请求,源码如下

@Produces({"application/xml", "application/json"})
publicclassInstanceResource {
    ...省略...
/**来自客户端实例的续订租约的put请求* A put request for renewing lease from a client instance.** @param isReplication*          请求头isreplicated 是否是从其他节点复制的*            a header parameter containing information whether this is*            replicated from other nodes.* @param overriddenStatus*          覆盖状态*            overridden status if any.* @param status :续约的服务状态,一般都是UP*            the {@link InstanceStatus} of the instance.* @param lastDirtyTimestamp 此实例信息更新的最后时间戳*            last timestamp when this instance information was updated.指示操作是成功还是失败的响应。* @return response indicating whether the operation was a success or*         failure.*/@PUTpublicResponserenewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) StringisReplication,
@QueryParam("overriddenstatus") StringoverriddenStatus,
@QueryParam("status") Stringstatus,
@QueryParam("lastDirtyTimestamp") StringlastDirtyTimestamp) {
//是否是其他节点复制booleanisFromReplicaNode="true".equals(isReplication);
//执行续约流程booleanisSuccess=registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a registerif (!isSuccess) {
//没续约成功,返回NOT_FOUND状态logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
returnResponse.status(Status.NOT_FOUND).build();
        }
// 检查是否需要根据最后更新间戳进行同步,客户端实例可能已更改了某些值// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponseresponse=null;
if (lastDirtyTimestamp!=null&&serverConfig.shouldSyncWhenTimestampDiffers()) {
response=this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() ==Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus!=null)
&&!(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&&isFromReplicaNode) {
//修改服务端维护的注册表中注册的服务的覆盖的状态registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
response=Response.ok().build();
        }
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
returnresponse;
    }
}

这里方法中调用了InstanceRegistry#renew的续约方法,跟踪下去

@Overridepublicbooleanrenew(finalStringappName, finalStringserverId,
booleanisReplication) {
log("renew "+appName+" serverId "+serverId+", isReplication {}"+isReplication);
//获取服务端本地存储的服务注册表,//其实是从一个AbstractInstanceRegistry中名字叫 registry的map中获取//因为服务注册就是存储到这个registry的map中的List<Application>applications=getSortedApplications();
for (Applicationinput : applications) {
//找到当前续约服务名对应的服务if (input.getName().equals(appName)) {
InstanceInfoinstance=null;
for (InstanceInfoinfo : input.getInstances()) {
//找到服务名相同,服务id也相同的服务if (info.getId().equals(serverId)) {
instance=info;
break;
                }
            }
//抛出续约事件publishEvent(newEurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
        }
    }
//调用super的续约方法进行续约returnsuper.renew(appName, serverId, isReplication);
}

这里做了如下事情

  1. 找到服务端维护的服务注册表registry(Map)
  2. 根据续约参数中的服务名,以及服务Id,找到续约的服务
  3. 抛出续约事件
  4. 调用父类的续约逻辑

我们继续看它父类的续约方法PeerAwareInstanceRegistryImpl#renew

/** (non-Javadoc)** @see com.netflix.eureka.registry.InstanceRegistry#renew(java.lang.String,* java.lang.String, long, boolean)*/publicbooleanrenew(finalStringappName, finalStringid, finalbooleanisReplication) {
//调用super的续约方法if (super.renew(appName, id, isReplication)) {
//续约的实例信息复制到其他的Eureka节点//注意这里是传入的Action是Action.Heartbeat 心跳replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
returntrue;
    }
returnfalse;
}

这里做了两个事情

  • 1是调用super的续约方法
  • 2是续约成功后,调用replicateToPeers把续约成功的实例信息同步到其他的Eureka节点上,这个方法我们在分析注册流程的时候已经看过,不管是注册,续约,取消注册,状态改变等操作都要执行replicateToPeers进行Eureka集群节点之间的数据同步.

我们重点跟踪一下super.renew方法即:AbstractInstanceRegistry#renew

/*** Marks the given instance of the given app name as renewed, and also marks whether it originated from* replication.** @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)*///appName应用名,id实例ID,isReplication是否是复制publicbooleanrenew(StringappName, Stringid, booleanisReplication) {
//增加给定统计信息的计数器,EurekaMonitors.counter加一,用作Eureka监控RENEW.increment(isReplication);
//从服务端维护的服务实例注册表中获取到当前续约服务名对应的服务信息Map<String, Lease<InstanceInfo>>gMap=registry.get(appName);
Lease<InstanceInfo>leaseToRenew=null;
if (gMap!=null) {
//获取服务的租约对象leaseToRenew=gMap.get(id);
    }
//如果没找到服务,返回falseif (leaseToRenew==null) {
//续约服务没找到的计数器增加RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
returnfalse;
    } else {
//根据租约对象获取到服务的实例信息对象InstanceInfoInstanceInfoinstanceInfo=leaseToRenew.getHolder();
//如果服务实例信息不为空if (instanceInfo!=null) {
// touchASGCache(instanceInfo.getASGName());//获取服务的状态InstanceStatusoverriddenInstanceStatus=this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
//如果服务的状态是UNKNOWN,说明服务可能被删除了,需要重新注册,返回falseif (overriddenInstanceStatus==InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+"; re-register required", instanceInfo.getId());
//续约服务没找到的计数器增加RENEW_NOT_FOUND.increment(isReplication);
returnfalse;
            }
//实例状态与实例的OverriddenStatus覆盖实例状态不同。因此将状态设置为OverriddenStatus状态if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "+"Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
//修改状态为overriddenInstanceStatus,instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
//增加最后更新计数renewsLastMin.increment();
//执行续约,com.netflix.eureka.lease.Lease#renewleaseToRenew.renew();
//续约成功returntrue;
    }
}

这个方法又做了什么呢

  • 1是增加续约计数,Eureka监控用的,然后根据续约的服务名在本地注册表registry(ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry)获取服务的Lease租约对象,然后根据租约对象获取InstanceInfo服务注册实例
  • 2如果InstanceInfo为空,说明服务可能被删除,增加续约失败计数后,返回false
  • 3如果当前实例的状态和overriddenInstanceStatus状态不一致,就修改实例状态为overriddenInstanceStatus
  • 4增加最后更新计数,然后调用 Lease#renew方法修改lastUpdateTimestamp,即最后续约时间更新,最后续约时间更新了,Eureka Server就知道服务实例续约成功,否则到了最后续约失效时间服务端会考虑剔除服务。

当然,服务续约成功之后还会回到 replicateToPeers 方法中把续约的服务信息同步到其他的Eureka节点,我们回到replicateToPeers方法看一下

privatevoidreplicateToPeers(Actionaction, StringappName, Stringid,
InstanceInfoinfo/* optional */,
InstanceStatusnewStatus/* optional */, booleanisReplication) {
Stopwatchtracer=action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
        }
// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes==Collections.EMPTY_LIST||isReplication) {
return;
        }
//循环除开自己的所有Eureka节点都要同步for (finalPeerEurekaNodenode : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
            }
//复制实例信息到其他节点replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
tracer.stop();
    }
}
/*** Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/privatevoidreplicateInstanceActionsToPeers(Actionaction, StringappName,
Stringid, InstanceInfoinfo, InstanceStatusnewStatus,
PeerEurekaNodenode) {
try {
InstanceInfoinfoFromRegistry=null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
caseCancel:
node.cancel(appName, id);
break;
caseHeartbeat:
//续约心跳,获取服务的状态InstanceStatusoverriddenStatus=overriddenInstanceStatusMap.get(id);
//根据名字和id获取实例InstanceInfoinfoFromRegistry=getInstanceByAppAndId(appName, id, false);
//调用PeerEurekaNode.heartbeat方法node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
caseRegister:
node.register(info);
break;
caseStatusUpdate:
infoFromRegistry=getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
caseDeleteStatusOverride:
infoFromRegistry=getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
        }
    } catch (Throwablet) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

这里跟服务注册流程差不多,做了如下事情

  • 1找到所有的PeerEurekaNode 节点,然后循环每一个PeerEurekaNode 都要进行实例信息复制,当然自己不用复制,
  • 2.根据Action类型判断是什么动作(Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;),当前是Heartbeat
  • 3.调用PeerEurekaNode 的heartbeat方法同步心跳信息到Eureka 节点

PeerEurekaNode#heartbeat 源码如下

/**发送心跳信息到Eureka节点,如果服务没注册,先注册,然后再发送* Send the heartbeat information of an instance to the node represented by* this class. If the instance does not exist the node, the instance* registration information is sent again to the peer node.** @param appName 服务名*            the application name of the instance.* @param id  服务实例id*            the unique identifier of the instance.* @param info 实例信息对象*            the instance info {@link InstanceInfo} of the instance.* @param overriddenStatus 服务状态*            the overridden status information if any of the instance.* @throws Throwable*/publicvoidheartbeat(finalStringappName, finalStringid,
finalInstanceInfoinfo, finalInstanceStatusoverriddenStatus,
booleanprimeConnection) throwsThrowable {
if (primeConnection) {
//根据调用传入的参数,这里是false,不会执行下面的代码,除非是走Aws亚马逊才是true// We do not care about the result for priming request.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
    }
//创建复制任务ReplicationTaskreplicationTask=newInstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@OverridepublicEurekaHttpResponse<InstanceInfo>execute() throwsThrowable {
//执行心跳发送,参数:服务名,服务id,实例信息InstanceInfo ,服务状态returnreplicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }
@OverridepublicvoidhandleFailure(intstatusCode, ObjectresponseEntity) throwsThrowable {
//如果发送失败super.handleFailure(statusCode, responseEntity);
if (statusCode==404) {
logger.warn("{}: missing entry.", getTaskName());
if (info!=null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
//这里调用注册方法register(info);
                }
            } elseif (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfopeerInstanceInfo= (InstanceInfo) responseEntity;
if (peerInstanceInfo!=null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                }
            }
        }
    };
longexpiryTime=System.currentTimeMillis() +getLeaseRenewalOf(info);
//分批调度程序,执行任务batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}

PeerEurekaNode代表了Eureka节点,heartbeat方法做了如下事情

  • 1创建一个InstanceReplicationTask实例复制任务
  • 2.任务通过调用replicationClient.sendHeartBeat 发送心跳信息
  • 3.如果发送失败,出现404错误会先走注册流程
  • 4.调用分配调度器执行任务

replicationClient.sendHeartBeat其实调用的是JerseyReplicationClient.sendHeartBeat发送心跳

@OverridepublicEurekaHttpResponse<InstanceInfo>sendHeartBeat(StringappName, Stringid, InstanceInfoinfo, InstanceStatusoverriddenStatus) {
//发送地址StringurlPath="apps/"+appName+'/'+id;
ClientResponseresponse=null;
try {
//封装http参数WebResourcewebResource=jerseyClient.getClient().resource(serviceUrl)
            .path(urlPath)
//状态            .queryParam("status", info.getStatus().toString())
//最后更新时间            .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus!=null) {
webResource=webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
BuilderrequestBuilder=webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//发送put请求response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
InstanceInfoinfoFromPeer=null;
if (response.getStatus() ==Status.CONFLICT.getStatusCode() &&response.hasEntity()) {
infoFromPeer=response.getEntity(InstanceInfo.class);
        }
returnanEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build();
    } finally {
if (logger.isDebugEnabled()) {
logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response==null?"N/A" : response.getStatus());
        }
if (response!=null) {
response.close();
        }
    }
}

到这里就结束了,最后总结一下

总结

Heron
+关注
目录
打赏
0
0
0
0
15
分享
相关文章
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
301 70
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
214 2
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
350 2
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
231 7
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
163 2
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
161 9
SpringCloud2023实战之接口服务测试工具SpringBootTest
SpringBootTest同时集成了JUnit Jupiter、AssertJ、Hamcrest测试辅助库,使得更容易编写但愿测试代码。
201 3
|
9月前
|
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
351 5
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
165 4
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问