前言
这一章我们来分析一下Eureka Server 服务注册表的拉取流程,请结合《Eureka Client服务发现》
在《Eureka Client服务发现》我们分析了,客户端会通过两种方式从服务端拉取注册表,在客户端系统启动的时候会进行全量拉取,随后默认30s/次会进行差异更新,那么在Eureka Server 服务端是如何处理服务注册表全量拉取和差异更新的呢?
全量拉取
Eureka Client向Eureka Server发请求,拉取服务注册表,Server端还是通过ServeltContainer接待请求,最终交给com.netflix.eureka.resources.ApplicationsResource#getContainers处理
/**返回所有的应用* Get information about all {@link com.netflix.discovery.shared.Applications}.*/publicResponsegetContainers( ("version") Stringversion, HEADER_ACCEPT) StringacceptHeader, (HEADER_ACCEPT_ENCODING) StringacceptEncoding, (EurekaAccept.HTTP_X_EUREKA_ACCEPT) StringeurekaAccept, (UriInfouriInfo, "regions") StringregionsStr) { (booleanisRemoteRegionRequested=null!=regionsStr&&!regionsStr.isEmpty(); String[] regions=null; if (!isRemoteRegionRequested) { //注册表全量拉取统计计数增加EurekaMonitors.GET_ALL.increment(); } else { regions=regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can// restrict access if it is not// ready to serve traffic depending on various reasons.//检查服务端时候准备好可以被访问if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { returnResponse.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); //处理返回的数据类型默认JSONKeyTypekeyType=Key.KeyType.JSON; StringreturnMediaType=MediaType.APPLICATION_JSON; if (acceptHeader==null||!acceptHeader.contains(HEADER_JSON_VALUE)) { //请求头么有指定格式,返回XML格式keyType=Key.KeyType.XML; returnMediaType=MediaType.APPLICATION_XML; } //创建缓存keyKeycacheKey=newKey(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, //通过ALL_APPS构建key keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Responseresponse; //这里判断是否是GZIP格式,返回结果的编码类型不一样,获取方式是一致的if (acceptEncoding!=null&&acceptEncoding.contains(HEADER_GZIP_VALUE)) { //如果格式是gzip,调用responseCache.getGZIP(cacheKey)获取//底层会从一个ConcurrentMap<Key, Value> readOnlyCacheMap 只读缓存中去获取全量注册表response=Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { //普通获取responseCache.get(cacheKey)//底层会从一个ConcurrentMap<Key, Value> readOnlyCacheMap 只读缓存中去获取全量注册表response=Response.ok(responseCache.get(cacheKey)) .build(); } returnresponse; }
responseCache.getGZIP(cacheKey)最终会调用 com.netflix.eureka.registry.ResponseCacheImpl#getValue
/*** Get the payload in both compressed and uncompressed form.*/ValuegetValue(finalKeykey, booleanuseReadOnlyCache) { Valuepayload=null; try { if (useReadOnlyCache) { //从只读缓存中获取finalValuecurrentPayload=readOnlyCacheMap.get(key); if (currentPayload!=null) { payload=currentPayload; } else { //如果只读缓存中获取不到,从读写缓存中获取payload=readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload=readWriteCacheMap.get(key); } } catch (Throwablet) { logger.error("Cannot get value for key : {}", key, t); } returnpayload; }
差异更新
差异更新也在ApplicationsResource中:com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential,源码如下
/**获取Applications服务注册表中有改变的服务,注册,取消,状态更改和过期都会造成服务的改变* Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.** <p>* The delta changes represent the registry information change for a period* as configured by* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The* changes that can happen in a registry include* <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally* the changes to the registry are infrequent and hence getting just the* delta will be much more efficient than getting the complete registry.* </p>** <p>* Since the delta information is cached over a period of time, the requests* may return the same data multiple times within the window configured by* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients* are expected to handle this duplicate information.* <p>** @param version the version of the request.* @param acceptHeader the accept header to indicate whether to serve JSON or XML data.* @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.* @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}* @param uriInfo the {@link java.net.URI} information of the request made.* @return response containing the delta information of the* {@link AbstractInstanceRegistry}.*/"delta") (publicResponsegetContainerDifferential( "version") Stringversion, (HEADER_ACCEPT) StringacceptHeader, (HEADER_ACCEPT_ENCODING) StringacceptEncoding, (EurekaAccept.HTTP_X_EUREKA_ACCEPT) StringeurekaAccept, (UriInfouriInfo, ("regions") StringregionsStr) { booleanisRemoteRegionRequested=null!=regionsStr&&!regionsStr.isEmpty(); // If the delta flag is disabled in discovery or if the lease expiration// has been disabled, redirect clients to get all instances//如果禁用了Delta注册表差异化拉取,或者服务不可访问,返回拒绝if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) { returnResponse.status(Status.FORBIDDEN).build(); } String[] regions=null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL_DELTA.increment(); } else { regions=regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); } CurrentRequestVersion.set(Version.toEnum(version)); //处理反会的数据格式JSON默认KeyTypekeyType=Key.KeyType.JSON; StringreturnMediaType=MediaType.APPLICATION_JSON; if (acceptHeader==null||!acceptHeader.contains(HEADER_JSON_VALUE)) { keyType=Key.KeyType.XML; returnMediaType=MediaType.APPLICATION_XML; } //构建缓存keyKeycacheKey=newKey(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, //通过ALL_APPS_DELTA构建key keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding!=null&&acceptEncoding.contains(HEADER_GZIP_VALUE)) { //从responseCache获取内容returnResponse.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { returnResponse.ok(responseCache.get(cacheKey)) .build(); } }
总结
Eureka Server 拉取服务注册表的逻辑还是比较简单的,不管是全量拉取,还是差别拉取都是通过ApplicationsResource中处理,然后构建出不同的key,从ResponseCache中去获取服务。