系列文章目录
二.SpringCloud源码剖析-Eureka Client 初始化过程
五.SpringCloud源码剖析-Eureka Client服务续约
六.SpringCloud源码剖析-Eureka Client取消注册
七.SpringCloud源码剖析-Eureka Server的自动配置
八.SpringCloud源码剖析-Eureka Server初始化流程
九.SpringCloud源码剖析-Eureka Server服务注册流程
十.SpringCloud源码剖析-Eureka Server服务续约
十一.SpringCloud源码剖析-Eureka Server服务注册表拉取
十二.SpringCloud源码剖析-Eureka Server服务剔除
十三.SpringCloud源码剖析-Eureka Server服务下线
什么是服务续约
EureakClient会定时向EureakServer发送续约心跳(默认30s/次) ,来告诉EurekaServer自己的健康状况,默认情况下3次续约失败(90s),EurekaServer考虑剔除续约失败的客户端服务
初始化定时任务
在EuerakClient中依然是通过scheduler定时实现,在com.netflix.discovery.DiscoveryClient#initScheduledTasks
中进行初始化,源码如下
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
//如果开启服务注册
if (clientConfig.shouldRegisterWithEureka()) {
//续约心跳 30s/次
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
//心跳任务
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread() //HeartbeatThread心跳的线程
),
renewalIntervalInSecs, TimeUnit.SECONDS);//心跳时间30s/次
...省略...
/**
//心跳任务,本身是一个线程对象
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
//调用renew()方法续约
if (renew()) {
//记录最后续约成功时间
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
服务续约线程HeartbeatThread
这里我们看到,通过scheduler定时执行HeartbeatThread
线程,在HeartbeatThread
中又调用了Discovery.renew()
方法执行服务续约,源码如下
/**
使用Rest请求像Eureak进行服务续约
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
//Http相应对象
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//使用eurekaTransport得到http客户端,发起心跳请求
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
//如果续约返回404,尝试重新发起服务注册,并设置Dirty
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
这里依然使用的是eurekaTransport得到一个EurekaHttpClient实例,然后通过EurekaHttpClientDecorator装饰器执行sendHeartBeat方法,源码如下
public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {
...省略...
//发送心跳
@Override
public EurekaHttpResponse<InstanceInfo>
sendHeartBeat(final String appName, //服务名
final String id,//服务ID
final InstanceInfo info,//服务注册对象
final InstanceStatus overriddenStatus) {
//服务注册状态
return execute(new RequestExecutor<InstanceInfo>() {
@Override
public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
//发送心跳
return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public RequestType getRequestType() {
return RequestType.SendHeartBeat;
}
});
}
通过EurekaHttpClientDecorator
装饰器会先后会执行RetryableEurekaHttpClient
(失败重试),RedirectingEurekaHttpClient
(重定向到不同的EurekaServer)MetricsCollectingEurekaHttpClient
(统计执行情况)针对于各种情况的Http客户端,然后
最终通过AbstractJerseyEurekaHttpClient
(JerseyApplicationClient),使用jerseyClient发起心跳请求
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
//webResource是对Web请求的封装
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
//服务状态 UP
.queryParam("status", info.getStatus().toString())
//最后刷新时间
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//发送put请求
response = requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity()) {
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
这里EureakClient会向EureakServer发送自己的服务状态,以及服务最后续约时间,使用的是PUT发起Rest请求。
总结
- DiscoveryClient.initScheduledTasks初始化续约心跳定时任务,30s/次执行HeartbeatThread线程发送续约请求
- HeartbeatThread调用DiscoveryClient.renew()续约
- renew方法中使用eurekaTransport得到EureakaHttpClient实例EurekaHttpClientDecorator 装饰器执行请求
- EurekaHttpClientDecorator 调用JerseyApplicationClient向EureakServer发送Rest请求,把服务状态和最后的续约时间当做参数
下一章推荐《Eureka Server服务续约》