四. 集群原理
- 集群同步由
PeerAwareInstanceRegistryImpl
来负责处理。
- 如果是一个 client node 注册到 server node 那么接收到这个 server node 会把 client node 的请求 转发 到其它的 server node
// 集群同步 private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // 所有集群节点(peerEurekaNodes)为空或者是一个集群操作(isReplication),防止死循环注册 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 同步到所有节点 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { // 排除当前节点 continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
- 集群同步事件处理
replicateInstanceActionsToPeers
// 集群同步事件处理 private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: // 取消 node.cancel(appName, id); break; case Heartbeat: // 心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: // 注册 node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } finally { CurrentRequestVersion.remove(); } }
- 集群启动同步
public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
五. 自我保护机制
- 是否启动自我保护机制
// 配置参数 isSelfPreservationModeEnabled public boolean isSelfPreservationModeEnabled() { return serverConfig.shouldEnableSelfPreservation(); }
- 触发自我保护机制的配置, 触发的条件,短时间内,大量的心跳连接过期。就是大量宕机。eureka 就会触发自我保护机制。大量的阈值是 85%。
// 触发自我保护机制的配置阈值 numberOfRenewsPerMinThreshold public boolean isLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire. return true; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; }
- 服务注册的执行的自我保护机制
// The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } }
更改自我保护的阈值 updateRenewsPerMinThreshold
// getExpectedClientRenewalIntervalSeconds 默认每分钟发送心跳字数默认两次 // 计算公式: 预估心跳值(所有注册上来的实例) * 每分钟触发心跳连接次数(60s / 服务端每分钟刷新时间默认 30s) * 自我保护机制触发的百分比(默认 85%) // 就是 15% 没有连接上,就触发自我保护机制 protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); }
自我保护机制的定时更改
private void scheduleRenewalThresholdUpdateTask() { timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); } 复制代码
总结:如果想要自我保护机制正常工作,需要客户端的每分钟心跳次数与服务端的配置相同
六. 缓存机制
Eureka 3 层缓存
- 只读缓存:
ConcurrentHashMap
- 读写缓存:
guava
- 真实数据:
ConcurrentHashMap
目的:如果直接操作真实数据,这样就可以减少锁的抢占,带来效率上的提升;会出现的问题就是数据不一致,当时不能带来强一致性 95%
- 服务发现,获取服务信息的获取
@GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { // responseCache 缓存对象 response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } CurrentRequestVersion.remove(); return response; }
最终查询调用 getValue
方法
// ResponseCacheImpl Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { // 1. 只读缓存 final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { // 2. 读写缓存 guava payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; }
查询真实数据
private Value generatePayload(Key key) { Stopwatch tracer = null; try { String payload; switch (key.getEntityType()) { case Application: boolean isRemoteRegionRequested = key.hasRegions(); if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } } else { tracer = serializeOneApptimer.start(); payload = getPayLoad(key, registry.getApplication(key.getName())); } break; case VIP: case SVIP: tracer = serializeViptimer.start(); payload = getPayLoad(key, getApplicationsForVip(key, registry)); break; default: logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType()); payload = ""; break; } return new Value(payload); } finally { if (tracer != null) { tracer.stop(); } } }
缓存读取总结:
- 首先进入只读缓存;
- 如果只读缓存没有的话就进入读写缓存;
- 如果读写缓存也没有, 就执行监听器的逻辑,从真实数据里面拿。
缓存在什么时候会发生更改?
- 只读缓存修改的地点, 只读缓存只能通过定时任务,每 30 秒进行一次同步更新。
- 如果只读缓存找不到,但是读写缓存可以查询到,就更新到只读缓存。
- 延迟的统计:30秒计时器延迟 + 客户端缓存延迟 30s + ribbon (1s) = 61秒
// ResponseCacheImpl Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { // 如果只读缓存找不到,但是读写缓存可以查询到的时候会更新。 payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; } // 只读缓存更新,30s 执行一次同步数据 private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; }
当做服务发现的时候会锁住注册(修改、下架)操作,服务发现的时候加的是写锁, 注册(修改、下架)操作时读锁;
这样设计的目的是为了尽可能的保证读的准确性。