老规矩本文还是重点从原理上讨论,不涉及使用。
一 Eureka原理
1 服务治理
整体而言Eureka的服务治理模型如下所示:
Eureka的服务治理原理与过程是:
- 启动注册中心。注册中心启动时会:启动一个定时任务用于检测、清理失效的任务(由EvictionTask实现)。
- 启动服务集群。服务提供者启动时会:向注册中心注册服务(由DiscoveryClient#InstanceInfoReplicator类实现);启动一个定时任务用于定时发送心跳信息到注册中心(由DiscoveryClient#HeartbeatThread类实现)。
- 启动消费者集群。服务消费者启动时会:全量的从注册中心获取服务(由DiscoverClient#getAndStoreFullRegistry()方法实现);启动定时任务用于定时增量的从注册中心获取服务信息(由DiscoverClient#getAndUpdateDelta()方法实现)。
- 注册中心是集群模式时,会将收到的注册信息在多个Eureka Server之间拷贝,这是一种高可用方案,当部分Eureka Server失效后,整个集群依然可以运行。
- 服务提供者向注册中心发送心跳的目的是告诉注册中心我还活着,别将我踢出了。
- 服务消费者获取到服务提供者信息以后,会在本地缓存提供者信息,当需要使用提供者服务时,直接从缓存的清单中获取提供者信息,然后通过负载均衡的方式进行调用。本地缓存服务提供者清单可以在集群的注册中心不可用时,依然能够使用服务。
这里只是简单介绍一下Eureka的流程,后面会从源码的角度分析。
2 服务分区
eureka通过region和zone来进行分区。region:可以简单理解为地理上的分区,比如亚洲地区、华北地区等等,没有具体大小的限制。zone:可以简单理解为region内的具体机房,比如说region划分为上海,然后上海有两个机房,就可以在此region之下划分出zone1,zone2两个zone。
通过region和zone可以实现一种高可用的多活方案,如下图所示:
正常情况下,zone1中的Consumer1消费Provider1提供的服务,这样性能损耗最低。
当zone1中的Provider1服务不可用时,消费者可以转向zone2消费Provider2,如下图所示:
3 对比Zookeeper
1) Eureka保证AP
CAP中,Eureka保证AP:Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务,只要有一台Eureka Server还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。
如果在15分钟内超过85%的节点都没有正常的心跳,Eureka会认为客户端与注册中心出现了网络故障,那么Eureka将进入自我保护机制,此时的处理逻辑是:
- Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务
- Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)
- 当网络稳定时,当前实例新的注册信息会被同步到其它节点中
2) Zookeeper保证CP
CAP中Zookeeper保证CP:当zookeeper集群处于选主过程中,将不能向zk注册服务,也不能获取服务;当zookeeper集群超过半数机器不可用,因为无法选出master,也就处于瘫痪状态。就这一点而言,作为注册中心,其实并不是最好的选择。
二 服务注册
1 服务提供方注册
1) 源码分析
服务注册主要有两个步骤组成:注册和续约。注册是为了将提供的服务信息注册到EurekaServer端,续约是在注册完服务之后,服务提供者会维护一个心跳用来持续告诉EurekaServer我还活着,以防止Eureka Server的剔除任务将该服务实例从服务列表中排除出去。
对于DiscoverClient而言,主要的逻辑是:创建DiscoverClient时,会注册当前服务到EurekaServer,并且启动一个定时程序来不断的续约。
a) 注册
在DiscoverClient的构造函数中,会调用initScheduledTasks()方法,这个方法就是用来注册服务的。示例代码如下:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//省略代码
initScheduledTasks();
//省略代码
}
initScheduledTasks是注册和续约的核心逻辑,他先通过InstanceInfoReplicator来注册服务,然后创建定时程序不断的续约。
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh", scheduler, cacheRefreshExecutor,
registryFetchIntervalSeconds, TimeUnit.SECONDS,
expBackOffBound, new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat", scheduler, heartbeatExecutor,
renewalIntervalInSecs, TimeUnit.SECONDS,
expBackOffBound, new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
InstanceInfoReplicator#start方法将会触发注册操作,注册的源码如下:
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
以下为截取注册信息(即instanceInfo信息),它包含了查找服务提供者的所有信息。
b) 续约
通过以下两个属性,可以控制续约的间隔时间、续约过期时间:
eureka.instance.lease-renewal-interval-in-seconds=30
eureka.instance.lease -expiration-duration-in-seconds=90
在DiscoveryClient#initScheduledTasks中,会启动一个定时程序,用不断的续约。
private void initScheduledTasks() {
//省略代码
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
//省略代码
}
续约的定时程序最终将触发DiscoveryClient#renew方法,此方法将会发送心跳信息到EurekaServer。
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
2 注册中心管理服务
注册中心受理注册申请的入口在ApplicationResource#addInstance方法中,他通过调用PeerAwareInstanceRegistry#register方法完成注册。PeerAwareInstanceRegistryImpl#register首先进行服务登记,然后将此服务信息复制到其他的注册中心,源码如下所示:
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
注意:续约的入口是InstanceResource#renewLease方法
1) 服务登记
服务登记的逻辑比较复杂,简单来说就是将收到的注册申请信息放到一个ConcurrentHashMap中,然后更新此Instance的时间、状态等信息,这些信息对于服务中心统计服务存活状态非常重要。源码如下所示:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if ("true".equals(serverConfig.getExperimental("registry.registration.ignoreIfDirtyTimestampIsOlder"))) {
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
} else {
registrant.setLastDirtyTimestamp(existingLastDirtyTimestamp);
}
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
)
2) 拷贝服务信息到其他注册中心
拷贝服务信息到其他注册中心,可以实现高可用,只向一个Eureka Server中注册服务,他们在整个Eureka Server集群中都能看到这些服务实例,那么当出现注册中心部分机器不可用的时候,这个集群依然可用。
拷贝逻辑的核心是:首先确认是否要拷贝,然后获取出所有的Eureka Server节点,然后将此信息逐个拷贝到这些节点中。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
replicateInstanceActionsToPeers接口根据不同的action执行不同的业务操作,在服务注册阶段,这里是通过node.register来将服务的实例信息注册到其他注册中心节点。
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
3) 服务提供者关闭服务
服务提供者关闭时,依然会调用ApplicationResource#addInstance方法,区别是,注册时参数InstanceInfo中status=UP,关闭时参数InstanceInfo中status=DOWN。因为状态不懂,所以关闭时,不会更新租约信息的时间信息。
4) 清理失效的服务
Eureka Server启动时,会创建一个定时器,定期检查过期的服务,然后将这些服务器清理掉。AbstractInstanceRegistry#postInit方法就是用来清理失效服务的,源码如下:
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
AbstractInstanceRegistry#evict包含了清理失效服务的核心逻辑,
- 首先检查是否清理租赁期满的服务。
- 如果需要清理,那么从registry(一个ConcurrentHashMap)中逐个取出服务信息并检查是否已经过期。
- 如果过期那么放到expiredLeases集合中。
- 通过方法Math.min(expiredLeases.size(), evictionLimit)计算出需要清理的数量,然后随机清理。
源码如下所示:
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法用来计算是否启用租赁过期清理功能。源码如下所示:
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
配置项eureka.server.enable-self-preservation用来控制注册中心的保护机制,默认为false。isSelfPreservationModeEnabled()方法由此配置项控制。Eureka会统计15分钟之内心跳失败的比例低于85%将会触发保护机制,不剔除服务提供者,如果关闭服务注册中心将不可用的实例正确剔除
三 服务发现
1 消费端获取服务
当消费者启动时,会发送一个REST请求给服务注册中心,来获取注册中心注册的服务清单。为了性能考虑,Eureka Server会维护一份只读的服务清单来返回给消费端,同时消费端也会缓存此列表,默认每隔30秒更新一次。有两个与之相关的参数:
eureka.client.fetch-registry= true //是否获取注册的服务
eureka.client.registery-fetch-interval-second=30 //多久获取一次注册的服务,单位是秒。
1) 启动时获取服务
在DiscoverClient启动时,会通过以下代码触发获取注册服务的请求,如果获取失败,那么会通过fetchRegistryFromBackup方法获取。
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
获取服务并缓存的逻辑由fetchRegistry实现,具体来说就是通过getAndStoreFullRegistry()或者getAndUpdateDelta()方法来获取提供者信息;然后刷新缓存,最后更新服务状态标识。
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
2) 定时更新
在DiscoverClient启动时,会启动一个定时程序,用于定时获取服务信息。
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh", scheduler, cacheRefreshExecutor,
registryFetchIntervalSeconds, TimeUnit.SECONDS,
expBackOffBound, new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//省略代码
}
2 注册中心受理获取服务的请求
1) 全量获取
消费端首次启动时,通过这个方法获取全量信息
public InstanceResource getInstanceInfo(@PathParam("id") String id) {
return new InstanceResource(this, id, serverConfig, registry);
}
从以下的截图可以看到,请求服务信息时,注册中心直接返回了缓存的服务列表信息。
2) 增量获取
增量更新的逻辑如下,对用于处理消费端getAndUpdateDelta()的请求。
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
四 服务消费
1 负载均衡
示例代码中使用了@LoadBalanced注解后,LoadBalancerAutoConfiguration会在创建RestTemplate时为他加上LoadBalancerInterceptor拦截器。源代码如下:
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
LoadBalancerInterceptor#intercept负责对请求进行拦截,这里拦截的核心罗是:通过负载均衡的方式调用服务提供者的服务,如下所示:
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ClientHttpResponse>() {
@Override
public ClientHttpResponse apply(final ServiceInstance instance)
throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance);
return execution.execute(serviceRequest, body);
}
});
}
通过RestTemplate请求时,经过拦截器,最终会通过LoadBalancerClient#execute执行业务逻辑。RibbonLoadBalancerClient#execute主要逻辑如下:
- 首先选择一个LoadBalancer。
- 然后通过LoadBalancer的规则选择服务提供者的服务器,默认情况LoadBalancer为ZoneAwareLoadBalancer。
- 封装球球上下文,并发起http请求,返回结果
代码如下所示:
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(ribbonServer);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
2 路由选择规则
负载均衡策略,很大程度上决定于路由策略。
1) RandomRule
随机选择一个服务实例,具体来说是通过Random随机的选择服务器实例。
2) RoundRobinRule
轮询服务器实例,选择服务实例,是默认采用的负载均衡策略。具体的实现逻辑是,定义一个counter,每选择一次提供者counter+1,通过「counter/总的实例数」确定这次选择第几个实例。
3) RetryRule
轮询+重试的策略,首先会尝试通过轮询方式获取服务实例,如果获取的服务实例不可用那么尝试重新获取,重试有一个时间限制,如果超过了deadline(默认500ms)还是没取到,则会返回一个null。
4) WeightedResponseTimeRule
会根据每一个实例的运行情况来给计算出该实例的一个权重,然后在挑选实例的时候则根据权重进行挑选,这样能够实现更优的实例调用。
WeightedResponseTimeRule中有一个名叫DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重,权重的计算规则也很简单,如果一个服务的平均响应时间越短则权重越大,那么该服务实例被选中执行任务的概率也就越大。
5) ClientConfigEnabledRoundRobinRule
和RoundRobinRule策略一致。
6) BestAvailableRule
根据loadBalancerStats中保存的服务实例的状态信息来过滤掉失效的服务实例的功能,然后顺便找出并发请求最小的服务实例来使用。如果loadBalancerStats为null,则采用轮询策略。
7) PredicateBasedRule
通过内部一个过滤器过滤出一部分服务实例清单,然后采用轮询策略。
8) ZoneAvoidanceRule
ZoneAvoidanceRule中的过滤条件是以ZoneAvoidancePredicate为主过滤条件和以AvailabilityPredicate为次过滤条件组成的一个叫做CompositePredicate的组合过滤条件,然后采用轮询策略。