前言
前一章节我们分析了一下Eureka Server实现服务注册的流程,这一章节我们把Eureka Server服务续约流程看了,请结合《Eureka Client服务续约》
Eureka Server 服务续约流程
Eureka客户端定时向服务端发起服务续约的请求,也是通过ServeltContainer来接待请求,请求中携带了当前续约服务的状态和最后修改时间,ServeltContainer最终会调用 InstanceResource来处理请求,源码如下
"application/xml", "application/json"}) ({publicclassInstanceResource { ...省略... /**来自客户端实例的续订租约的put请求* A put request for renewing lease from a client instance.** @param isReplication* 请求头isreplicated 是否是从其他节点复制的* a header parameter containing information whether this is* replicated from other nodes.* @param overriddenStatus* 覆盖状态* overridden status if any.* @param status :续约的服务状态,一般都是UP* the {@link InstanceStatus} of the instance.* @param lastDirtyTimestamp 此实例信息更新的最后时间戳* last timestamp when this instance information was updated.指示操作是成功还是失败的响应。* @return response indicating whether the operation was a success or* failure.*/publicResponserenewLease( PeerEurekaNode.HEADER_REPLICATION) StringisReplication, ("overriddenstatus") StringoverriddenStatus, ("status") Stringstatus, ("lastDirtyTimestamp") StringlastDirtyTimestamp) { (//是否是其他节点复制booleanisFromReplicaNode="true".equals(isReplication); //执行续约流程booleanisSuccess=registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a registerif (!isSuccess) { //没续约成功,返回NOT_FOUND状态logger.warn("Not Found (Renew): {} - {}", app.getName(), id); returnResponse.status(Status.NOT_FOUND).build(); } // 检查是否需要根据最后更新间戳进行同步,客户端实例可能已更改了某些值// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponseresponse=null; if (lastDirtyTimestamp!=null&&serverConfig.shouldSyncWhenTimestampDiffers()) { response=this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() ==Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus!=null) &&!(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) &&isFromReplicaNode) { //修改服务端维护的注册表中注册的服务的覆盖的状态registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response=Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); returnresponse; } }
这里方法中调用了InstanceRegistry#renew的续约方法,跟踪下去
publicbooleanrenew(finalStringappName, finalStringserverId, booleanisReplication) { log("renew "+appName+" serverId "+serverId+", isReplication {}"+isReplication); //获取服务端本地存储的服务注册表,//其实是从一个AbstractInstanceRegistry中名字叫 registry的map中获取//因为服务注册就是存储到这个registry的map中的List<Application>applications=getSortedApplications(); for (Applicationinput : applications) { //找到当前续约服务名对应的服务if (input.getName().equals(appName)) { InstanceInfoinstance=null; for (InstanceInfoinfo : input.getInstances()) { //找到服务名相同,服务id也相同的服务if (info.getId().equals(serverId)) { instance=info; break; } } //抛出续约事件publishEvent(newEurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); break; } } //调用super的续约方法进行续约returnsuper.renew(appName, serverId, isReplication); }
这里做了如下事情
- 找到服务端维护的服务注册表registry(Map)
- 根据续约参数中的服务名,以及服务Id,找到续约的服务
- 抛出续约事件
- 调用父类的续约逻辑
我们继续看它父类的续约方法PeerAwareInstanceRegistryImpl#renew
/** (non-Javadoc)** @see com.netflix.eureka.registry.InstanceRegistry#renew(java.lang.String,* java.lang.String, long, boolean)*/publicbooleanrenew(finalStringappName, finalStringid, finalbooleanisReplication) { //调用super的续约方法if (super.renew(appName, id, isReplication)) { //续约的实例信息复制到其他的Eureka节点//注意这里是传入的Action是Action.Heartbeat 心跳replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); returntrue; } returnfalse; }
这里做了两个事情
- 1是调用super的续约方法
- 2是续约成功后,调用replicateToPeers把续约成功的实例信息同步到其他的Eureka节点上,这个方法我们在分析注册流程的时候已经看过,不管是注册,续约,取消注册,状态改变等操作都要执行replicateToPeers进行Eureka集群节点之间的数据同步.
我们重点跟踪一下super.renew方法即:AbstractInstanceRegistry#renew
/*** Marks the given instance of the given app name as renewed, and also marks whether it originated from* replication.** @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)*///appName应用名,id实例ID,isReplication是否是复制publicbooleanrenew(StringappName, Stringid, booleanisReplication) { //增加给定统计信息的计数器,EurekaMonitors.counter加一,用作Eureka监控RENEW.increment(isReplication); //从服务端维护的服务实例注册表中获取到当前续约服务名对应的服务信息Map<String, Lease<InstanceInfo>>gMap=registry.get(appName); Lease<InstanceInfo>leaseToRenew=null; if (gMap!=null) { //获取服务的租约对象leaseToRenew=gMap.get(id); } //如果没找到服务,返回falseif (leaseToRenew==null) { //续约服务没找到的计数器增加RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); returnfalse; } else { //根据租约对象获取到服务的实例信息对象InstanceInfoInstanceInfoinstanceInfo=leaseToRenew.getHolder(); //如果服务实例信息不为空if (instanceInfo!=null) { // touchASGCache(instanceInfo.getASGName());//获取服务的状态InstanceStatusoverriddenInstanceStatus=this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); //如果服务的状态是UNKNOWN,说明服务可能被删除了,需要重新注册,返回falseif (overriddenInstanceStatus==InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+"; re-register required", instanceInfo.getId()); //续约服务没找到的计数器增加RENEW_NOT_FOUND.increment(isReplication); returnfalse; } //实例状态与实例的OverriddenStatus覆盖实例状态不同。因此将状态设置为OverriddenStatus状态if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. "+"Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); //修改状态为overriddenInstanceStatus,instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } //增加最后更新计数renewsLastMin.increment(); //执行续约,com.netflix.eureka.lease.Lease#renewleaseToRenew.renew(); //续约成功returntrue; } }
这个方法又做了什么呢
- 1是增加续约计数,Eureka监控用的,然后根据续约的服务名在本地注册表registry(
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
)获取服务的Lease租约对象,然后根据租约对象获取InstanceInfo服务注册实例 - 2如果InstanceInfo为空,说明服务可能被删除,增加续约失败计数后,返回false
- 3如果当前实例的状态和overriddenInstanceStatus状态不一致,就修改实例状态为overriddenInstanceStatus
- 4增加最后更新计数,然后调用 Lease#renew方法修改lastUpdateTimestamp,即最后续约时间更新,最后续约时间更新了,Eureka Server就知道服务实例续约成功,否则到了最后续约失效时间服务端会考虑剔除服务。
当然,服务续约成功之后还会回到 replicateToPeers 方法中把续约的服务信息同步到其他的Eureka节点,我们回到replicateToPeers方法看一下
privatevoidreplicateToPeers(Actionaction, StringappName, Stringid, InstanceInfoinfo/* optional */, InstanceStatusnewStatus/* optional */, booleanisReplication) { Stopwatchtracer=action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes==Collections.EMPTY_LIST||isReplication) { return; } //循环除开自己的所有Eureka节点都要同步for (finalPeerEurekaNodenode : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } //复制实例信息到其他节点replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } /*** Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/privatevoidreplicateInstanceActionsToPeers(Actionaction, StringappName, Stringid, InstanceInfoinfo, InstanceStatusnewStatus, PeerEurekaNodenode) { try { InstanceInfoinfoFromRegistry=null; CurrentRequestVersion.set(Version.V2); switch (action) { caseCancel: node.cancel(appName, id); break; caseHeartbeat: //续约心跳,获取服务的状态InstanceStatusoverriddenStatus=overriddenInstanceStatusMap.get(id); //根据名字和id获取实例InstanceInfoinfoFromRegistry=getInstanceByAppAndId(appName, id, false); //调用PeerEurekaNode.heartbeat方法node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; caseRegister: node.register(info); break; caseStatusUpdate: infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; caseDeleteStatusOverride: infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwablet) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
这里跟服务注册流程差不多,做了如下事情
- 1找到所有的PeerEurekaNode 节点,然后循环每一个PeerEurekaNode 都要进行实例信息复制,当然自己不用复制,
- 2.根据Action类型判断是什么动作(Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;),当前是Heartbeat
- 3.调用PeerEurekaNode 的heartbeat方法同步心跳信息到Eureka 节点
PeerEurekaNode#heartbeat 源码如下
/**发送心跳信息到Eureka节点,如果服务没注册,先注册,然后再发送* Send the heartbeat information of an instance to the node represented by* this class. If the instance does not exist the node, the instance* registration information is sent again to the peer node.** @param appName 服务名* the application name of the instance.* @param id 服务实例id* the unique identifier of the instance.* @param info 实例信息对象* the instance info {@link InstanceInfo} of the instance.* @param overriddenStatus 服务状态* the overridden status information if any of the instance.* @throws Throwable*/publicvoidheartbeat(finalStringappName, finalStringid, finalInstanceInfoinfo, finalInstanceStatusoverriddenStatus, booleanprimeConnection) throwsThrowable { if (primeConnection) { //根据调用传入的参数,这里是false,不会执行下面的代码,除非是走Aws亚马逊才是true// We do not care about the result for priming request.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); return; } //创建复制任务ReplicationTaskreplicationTask=newInstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) { publicEurekaHttpResponse<InstanceInfo>execute() throwsThrowable { //执行心跳发送,参数:服务名,服务id,实例信息InstanceInfo ,服务状态returnreplicationClient.sendHeartBeat(appName, id, info, overriddenStatus); } publicvoidhandleFailure(intstatusCode, ObjectresponseEntity) throwsThrowable { //如果发送失败super.handleFailure(statusCode, responseEntity); if (statusCode==404) { logger.warn("{}: missing entry.", getTaskName()); if (info!=null) { logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", getTaskName(), info.getId(), info.getStatus()); //这里调用注册方法register(info); } } elseif (config.shouldSyncWhenTimestampDiffers()) { InstanceInfopeerInstanceInfo= (InstanceInfo) responseEntity; if (peerInstanceInfo!=null) { syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); } } } }; longexpiryTime=System.currentTimeMillis() +getLeaseRenewalOf(info); //分批调度程序,执行任务batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime); }
PeerEurekaNode代表了Eureka节点,heartbeat方法做了如下事情
- 1创建一个InstanceReplicationTask实例复制任务
- 2.任务通过调用replicationClient.sendHeartBeat 发送心跳信息
- 3.如果发送失败,出现404错误会先走注册流程
- 4.调用分配调度器执行任务
replicationClient.sendHeartBeat其实调用的是JerseyReplicationClient.sendHeartBeat发送心跳
publicEurekaHttpResponse<InstanceInfo>sendHeartBeat(StringappName, Stringid, InstanceInfoinfo, InstanceStatusoverriddenStatus) { //发送地址StringurlPath="apps/"+appName+'/'+id; ClientResponseresponse=null; try { //封装http参数WebResourcewebResource=jerseyClient.getClient().resource(serviceUrl) .path(urlPath) //状态 .queryParam("status", info.getStatus().toString()) //最后更新时间 .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus!=null) { webResource=webResource.queryParam("overriddenstatus", overriddenStatus.name()); } BuilderrequestBuilder=webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //发送put请求response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class); InstanceInfoinfoFromPeer=null; if (response.getStatus() ==Status.CONFLICT.getStatusCode() &&response.hasEntity()) { infoFromPeer=response.getEntity(InstanceInfo.class); } returnanEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response==null?"N/A" : response.getStatus()); } if (response!=null) { response.close(); } } }
到这里就结束了,最后总结一下
总结