Eureka Client 源码
Eureka Client 服务更新访问的接口信息
- 服务初始化注册
- 服务发送心跳信息
- 服务列表拉取,全量拉取
- 服务列表拉取,增量拉取
一. 初始化过程
// DiscoveryClient DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh // 定时任务的调度类 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 心跳的线程池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 配置刷新的线程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // shouldFetchRegistry 关闭服务发现,不去拉取其它服务列表,自己只是作为服务提供者 if (clientConfig.shouldFetchRegistry()) { try { // 服务发现 // 1. 先去 eureka 上面拿信息 // 2. 然后去 eureka 上注册 boolean primaryFetchRegistryResult = fetchRegistry(false); if (!primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed"); } boolean backupFetchRegistryResult = true; // 服务端拿不到注册信息,就去备用服务器拿注册信息 if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { backupFetchRegistryResult = false; logger.info("Initial registry fetch from backup servers failed"); } if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); } } catch (Throwable th) { logger.error("Fetch registry error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // shouldRegisterWithEureka 自己作为一个消费者,自己只是一个消费者,不去做服务注册 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { // 服务注册 if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); }
服务注册
// 服务注册 // DiscoveryClient boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); } // AbstractJerseyEurekaHttpClient // 本质就是一个 Jersey 对 Eureka Server 注册接口调用 @Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); // info 客户端的数据 return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } } // 服务端就是在 ApplicationResource#addInstance 处理
二. 心跳连接
// DiscoveryClient boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); // 判断是否返回的是 NOT_FOUND 404 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
三. 服务发现
- 是否全量拉取, 拉取 eureka 所有的注册信息。
- 是否增量拉取, 拉取 eureka 最近三分钟数据;是否有更新的信息,如果有更新
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications // 获取客户端缓存信息 Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() // 配置只拉取全量 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) // VIP 地址, 当前 eureka client 是否对单一的注册地址感兴趣 || forceFullRegistryFetch // 强制全量拉取 || (applications == null) // 初始化的时候是全量 || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); // 全量拉取 } else { getAndUpdateDelta(applications); // 增量拉取 } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}", appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e)); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
增量服务拉取, 防止并发这里有一个加锁的操作。
// 服务增量拉取 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
服务端 recentlyChangedQueue 清空 3 分钟内被修改的数据
30 秒执行一次定时任务, 定时任务里面清空 3 分钟没有更新的微服务实例
让我们的增量 , 与全量数据的 hashCode
client: 本地的数 hashCode + 增量的 hashCode
与服务端传过来的 hashCode 判断是否相同
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions); apps.setAppsHashCode(allApps.getReconcileHashCode());
四. 服务下架
- 手动下架,注入
DiscoveryClient
然后调用 shutdown 方法
// DiscoveryClient public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); Monitors.unregisterObject(this); logger.info("Completed shut down of DiscoveryClient"); } }
Eureka 总结
Eureka 是一个比较优秀的服务注册中心, 实现了 AP 主要是保证可用性和分区容错性。
- Eureka Server 的所有的节点配置信息是存储在内存中的,查询服务服务注册信息使用了多层缓存。
- 多层缓存:首先会进行一级缓存
**readOnlyCacheMap**
读取,然后读取二级缓存**readWriteCacheMap**
, 最后读取真实数据 - 缓存过期,在接收到 register , renew cancel 请求后会失效二级缓存;服务剔除会删除二级缓存;二级缓存本身过期。
- 缓存更新,一级缓存中查询不到的时候,会从二级缓存中查询, 如果二级缓存中存在,一级缓存中不存在会同步到一级缓存中。定时任务 3分钟也会主动同步一次一级缓存。
- Eureka Server 集群环境中,注册的服务只会向一台服务发起注册,然后当前服务端节点会遍历其它节点进行注册信息同步。
- Eureka Client 默认会 30 秒中发送一次心跳来进行续约,告知 Eureka Server 客户端依然存活没有问题,如果 Eureka Server 90 秒钟没有收到客户端的续约,它会将实例从注册表中删除。
- Eureka Server自我保护机制,当大量的服务过期的时候,存活服务低于 85% 的时候,就会启动自我保护机制,每次只会下线 15%的服务。