什么是服务续约
EureakClient会定时向EureakServer发送续约心跳(默认30s/次) ,来告诉EurekaServer自己的健康状况,默认情况下3次续约失败(90s),EurekaServer考虑剔除续约失败的客户端服务
初始化定时任务
在EuerakClient中依然是通过scheduler定时实现,在com.netflix.discovery.DiscoveryClient#initScheduledTasks中进行初始化,源码如下
/*** Initializes all scheduled tasks.*/privatevoidinitScheduledTasks() { //如果开启服务注册if (clientConfig.shouldRegisterWithEureka()) { //续约心跳 30s/次intrenewalIntervalInSecs=instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); intexpBackOffBound=clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: "+"renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer//心跳任务scheduler.schedule( newTimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, newHeartbeatThread() //HeartbeatThread心跳的线程 ), renewalIntervalInSecs, TimeUnit.SECONDS);//心跳时间30s/次 ...省略... /**//心跳任务,本身是一个线程对象* The heartbeat task that renews the lease in the given intervals.*/privateclassHeartbeatThreadimplementsRunnable { publicvoidrun() { //调用renew()方法续约if (renew()) { //记录最后续约成功时间lastSuccessfulHeartbeatTimestamp=System.currentTimeMillis(); } } }
服务续约线程HeartbeatThread
这里我们看到,通过scheduler定时执行HeartbeatThread线程,在HeartbeatThread中又调用了Discovery.renew()方法执行服务续约,源码如下
/**使用Rest请求像Eureak进行服务续约* Renew with the eureka service by making the appropriate REST call*/booleanrenew() { //Http相应对象EurekaHttpResponse<InstanceInfo>httpResponse; try { //使用eurekaTransport得到http客户端,发起心跳请求httpResponse=eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX+"{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() ==404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX+"{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); longtimestamp=instanceInfo.setIsDirtyWithTime(); //如果续约返回404,尝试重新发起服务注册,并设置Dirtybooleansuccess=register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } returnsuccess; } returnhttpResponse.getStatusCode() ==200; } catch (Throwablee) { logger.error(PREFIX+"{} - was unable to send heartbeat!", appPathIdentifier, e); returnfalse; } }
这里依然使用的是eurekaTransport得到一个EurekaHttpClient实例,然后通过EurekaHttpClientDecorator装饰器执行sendHeartBeat方法,源码如下
publicabstractclassEurekaHttpClientDecoratorimplementsEurekaHttpClient { ...省略... //发送心跳publicEurekaHttpResponse<InstanceInfo>sendHeartBeat(finalStringappName, //服务名finalStringid,//服务IDfinalInstanceInfoinfo,//服务注册对象finalInstanceStatusoverriddenStatus) { //服务注册状态returnexecute(newRequestExecutor<InstanceInfo>() { publicEurekaHttpResponse<InstanceInfo>execute(EurekaHttpClientdelegate) { //发送心跳returndelegate.sendHeartBeat(appName, id, info, overriddenStatus); } publicRequestTypegetRequestType() { returnRequestType.SendHeartBeat; } }); }
通过EurekaHttpClientDecorator 装饰器会先后会执行RetryableEurekaHttpClient(失败重试),RedirectingEurekaHttpClient(重定向到不同的EurekaServer)MetricsCollectingEurekaHttpClient(统计执行情况)针对于各种情况的Http客户端,然后
最终通过AbstractJerseyEurekaHttpClient(JerseyApplicationClient),使用jerseyClient发起心跳请求
publicabstractclassAbstractJerseyEurekaHttpClientimplementsEurekaHttpClient { publicEurekaHttpResponse<InstanceInfo>sendHeartBeat(StringappName, Stringid, InstanceInfoinfo, InstanceStatusoverriddenStatus) { StringurlPath="apps/"+appName+'/'+id; ClientResponseresponse=null; try { //webResource是对Web请求的封装WebResourcewebResource=jerseyClient.resource(serviceUrl) .path(urlPath) //服务状态 UP .queryParam("status", info.getStatus().toString()) //最后刷新时间 .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus!=null) { webResource=webResource.queryParam("overriddenstatus", overriddenStatus.name()); } BuilderrequestBuilder=webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //发送put请求response=requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo>eurekaResponseBuilder=anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } returneurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response==null?"N/A" : response.getStatus()); } if (response!=null) { response.close(); } } }
这里EureakClient会向EureakServer发送自己的服务状态,以及服务最后续约时间,使用的是PUT发起Rest请求。
总结
- DiscoveryClient.initScheduledTasks初始化续约心跳定时任务,30s/次执行HeartbeatThread线程发送续约请求
- HeartbeatThread调用DiscoveryClient.renew()续约
- renew方法中使用eurekaTransport得到EureakaHttpClient实例EurekaHttpClientDecorator 装饰器执行请求
- EurekaHttpClientDecorator 调用JerseyApplicationClient向EureakServer发送Rest请求,把服务状态和最后的续约时间当做参数
