前言
什么是服务发现?微服务启动,所有服务提供者会向EurekaServer注册自己,从而在EurekaServer形成一个服务注册表,而消费者服务会定时从EurekaServer拉取服务注册表并缓存到本地,这个流程叫服务注册。当消费者服务向提供者服务发起调用时就会从服务注册表中找到目标服务的通信地址发起访问,那么EurekaClient是怎么从EurekaServer拉取服务注册表的呢?前一章节我们研究了《Eureak服务注册》流程,这一章节我们来研究一下Eureak服务发现。
1.初始化服务发现定时任务
学习过上一章节我们知道,在DiscoveryClient初始化过程中会初始化很多的定时任务其中就有对服务发现定时任务,如下:
/**初始化定时调度任务* Initializes all scheduled tasks.*/privatevoidinitScheduledTasks() { //判断是否开启服务发现功能if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer//刷新注册表定时任务时间间隔intregistryFetchIntervalSeconds=clientConfig.getRegistryFetchIntervalSeconds(); intexpBackOffBound=clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //定时任务调度器scheduler.schedule( newTimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, //属性注册表任务newCacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } ....省略代码....
首先判断了clientConfig配置中是否开启了服务发现功能(默认开启)shouldFetchRegistry
,然后获取到服务发现定时任务间隔时间registryFetchIntervalSeconds
(30s)后就初始化了服务发现的定时任务CacheRefreshThread
,它本身是一个Runnable它是Discovery的内部类 ,跟踪进去
classCacheRefreshThreadimplementsRunnable { publicvoidrun() { //调用刷新注册表方法refreshRegistry(); } } voidrefreshRegistry() { try { //这里在获取Regions 不为空,默认为空booleanisFetchingRemoteRegionRegistries=isFetchingRemoteRegionRegistries(); booleanremoteRegionsModified=false; // This makes sure that a dynamic change to remote regions to fetch is honored.//这里在获取eureka:client:fetch-remote-regions-registry配置,即远程regions,默认为空StringlatestRemoteRegions=clientConfig.fetchRegistryForRemoteRegions(); if (null!=latestRemoteRegions) { ....省略..... } //开始获取注册表fetchRegistry(false);booleansuccess=fetchRegistry(remoteRegionsModified); if (success) { //获取注册表大小registrySize=localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp=System.currentTimeMillis(); } if (logger.isDebugEnabled()) { ...省略... } } catch (Throwablee) { logger.error("Cannot fetch registry from server", e); } } ...省略代码...
CacheRefreshThread.refreshRegistry
方法 中首先会拉取Regions区域列表,默认为空,然后执行Discovery.fetchRegistry
方法拉取注册表,继续跟踪下去
privatebooleanfetchRegistry(booleanforceFullRegistryFetch) { //计算器开始Stopwatchtracer=FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all// applications//这里是获取本地所有的注册的服务应用,里面包含了注册的所有的服务Applicationsapplications=getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) ||forceFullRegistryFetch|| (applications==null) || (applications.getRegisteredApplications().size() ==0) || (applications.getVersion() ==-1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications==null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() ==0)); logger.info("Application version is -1: {}", (applications.getVersion() ==-1)); //获取并存储完整注册表getAndStoreFullRegistry(); } else { //从eureka服务器获取注册表信息,差别获取,并在本地更新getAndUpdateDelta(applications); } applications.setAppsHashCode(applicat ...省略代码...
在fetchRegistry方法中,首先会通过getApplications();
得到Applications
本地注册服务列表,然后这里有两种情况,一是调用getAndStoreFullRegistry
从eureka服务器全量获取注册表,而是调用getAndUpdateDelta(applications);
从eureka服务器获取有差别注册表信息,并在本地更新,在项目刚启动的时候会使用全量,后续会采用差别获取,
2.注册表全量获取
DiscoveryClient.getAndStoreFullRegistry
的作用是获得并存储完整的注册表,跟踪进去
privatevoidgetAndStoreFullRegistry() throwsThrowable { longcurrentUpdateGeneration=fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applicationsapps=null; //通过eurekaTransport.queryClient得到一个EurekaHttpClientEurekaHttpResponse<Applications>httpResponse=clientConfig.getRegistryRefreshSingleVipAddress() ==null?eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) { apps=httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps==null) { logger.error("The application is null for some reason. Not storing this information"); } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) { //存储注册表到Applications中localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,
使用的是其装饰器EurekaHttpClientDecorator.getApplications
方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications
获取所有的服务注册列表,跟踪一下源码
publicEurekaHttpResponse<Applications>getApplications(finalString... regions) { returnexecute(newRequestExecutor<Applications>() { publicEurekaHttpResponse<Applications>execute(EurekaHttpClientdelegate) { //最终会调用AbstractJerseyEurekaHttpClient获取注册表returndelegate.getApplications(regions); } publicRequestTypegetRequestType() { returnRequestType.GetApplications; } }); }
这里通过装饰类,先后会执行
RetryableEurekaHttpClient
(失败重试),
RedirectingEurekaHttpClient
(重定向到不同的EurekaServer)
MetricsCollectingEurekaHttpClient
(统计执行情况)
最终通过AbstractJerseyEurekaHttpClient
,使用jersey发起注册表的获取,源码如下
publicabstractclassAbstractJerseyEurekaHttpClientimplementsEurekaHttpClient { publicEurekaHttpResponse<Applications>getApplications(String... regions) { returngetApplicationsInternal("apps/", regions); } ...省略... privateEurekaHttpResponse<Applications>getApplicationsInternal(StringurlPath, String[] regions) { ClientResponseresponse=null; StringregionsParamValue=null; try { //得到一个web请求,serviceUrl是服务器的地址WebResourcewebResource=jerseyClient.resource(serviceUrl).path(urlPath); if (regions!=null&®ions.length>0) { regionsParamValue=StringUtil.join(regions); webResource=webResource.queryParam("regions", regionsParamValue); } BuilderrequestBuilder=webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //发送get请求response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applicationsapplications=null; if (response.getStatus() ==Status.OK.getStatusCode() &&response.hasEntity()) { //获取返回的服务注册表applications=response.getEntity(Applications.class); } //创建一个相应对象EurekaHttpResponseBuilderreturnanEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue==null?"" : "regions="+regionsParamValue, response==null?"N/A" : response.getStatus() ); } if (response!=null) { response.close(); } } }
3.注册表差别获取
在系统第一次启动的时候会调用DiscoveryClient.getAndStoreFullRegistry
拉取所有的注册表本存储到本地Applications中,之后会定时30s/次使用DiscoveryClient.getAndUpdateDelta(applications);
差别更新注册表,源码如下
privatevoidgetAndUpdateDelta(Applicationsapplications) throwsThrowable { longcurrentUpdateGeneration=fetchRegistryGeneration.get(); Applicationsdelta=null; //发送请求获取注册表EurekaHttpResponse<Applications>httpResponse=eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) { delta=httpResponse.getEntity(); } if (delta==null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+"Hence got the full registry."); getAndStoreFullRegistry(); } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); StringreconcileHashCode=""; if (fetchRegistryUpdateLock.tryLock()) { try { //更新本地注册表updateDelta(delta); reconcileHashCode=getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) ||clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
这里依然是通过eurekaTransport.queryClient.获取到服务注册列表,和全量不一样的地方是和本地的Applications中的服务列表做对比,把新的服务注册信息添加到Applications做更新操作
4.服务发现总结
1.每30s/次定时任务调用DiscoveryClient.CacheRefreshThread
内部类(Runnable)进行服务注册表的拉取
2.CacheRefreshThread的run方法调用DiscoveryClient.fetchRegistry()
拉取服务注册表
3.fetchRegistry方法中先通过Applications applications = getApplications();
从本地缓存中获取服务注册表Applications
,然后做出2种处理全量更新discoveryClient.getAndStoreFullRegistry
,或者差别更新discoveryClient.getAndUpdateDelta
4.系统启动的时候会做全量更新,通过EurekaHttpClient
的装饰EurekaHttpClientDecorator
器调用AbstractJerseyEurekaHttpClient
使用jersey向EureakServer获取注册表,获取到注册表存储到本地缓存Applications
5.后续的定时更新注册表都是采用差别更新,也是通过EurekaHttpClient
的装饰EurekaHttpClientDecorator
器调用AbstractJerseyEurekaHttpClient
使用jersey向EureakServer获取注册表,然后进行一个对比,更新本地的注册表Applications