SpringCloud源码剖析-Eureka Client服务发现

简介: 我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,使用的是其装饰器EurekaHttpClientDecorator.getApplications方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications获取所有的服务注册列表,跟踪一下源码

前言

什么是服务发现?微服务启动,所有服务提供者会向EurekaServer注册自己,从而在EurekaServer形成一个服务注册表,而消费者服务会定时从EurekaServer拉取服务注册表并缓存到本地,这个流程叫服务注册。当消费者服务向提供者服务发起调用时就会从服务注册表中找到目标服务的通信地址发起访问,那么EurekaClient是怎么从EurekaServer拉取服务注册表的呢?前一章节我们研究了《Eureak服务注册》流程,这一章节我们来研究一下Eureak服务发现。

1.初始化服务发现定时任务

学习过上一章节我们知道,在DiscoveryClient初始化过程中会初始化很多的定时任务其中就有对服务发现定时任务,如下:

/**初始化定时调度任务* Initializes all scheduled tasks.*/privatevoidinitScheduledTasks() {
//判断是否开启服务发现功能if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer//刷新注册表定时任务时间间隔intregistryFetchIntervalSeconds=clientConfig.getRegistryFetchIntervalSeconds();
intexpBackOffBound=clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//定时任务调度器scheduler.schedule(
newTimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//属性注册表任务newCacheRefreshThread()
            ),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    ....省略代码....

首先判断了clientConfig配置中是否开启了服务发现功能(默认开启)shouldFetchRegistry,然后获取到服务发现定时任务间隔时间registryFetchIntervalSeconds(30s)后就初始化了服务发现的定时任务CacheRefreshThread,它本身是一个Runnable它是Discovery的内部类 ,跟踪进去

classCacheRefreshThreadimplementsRunnable {
publicvoidrun() {
//调用刷新注册表方法refreshRegistry();
    }
}
@VisibleForTestingvoidrefreshRegistry() {
try {
//这里在获取Regions 不为空,默认为空booleanisFetchingRemoteRegionRegistries=isFetchingRemoteRegionRegistries();
booleanremoteRegionsModified=false;
// This makes sure that a dynamic change to remote regions to fetch is honored.//这里在获取eureka:client:fetch-remote-regions-registry配置,即远程regions,默认为空StringlatestRemoteRegions=clientConfig.fetchRegistryForRemoteRegions();
if (null!=latestRemoteRegions) {
            ....省略.....
            }
//开始获取注册表fetchRegistry(false);booleansuccess=fetchRegistry(remoteRegionsModified);
if (success) {
//获取注册表大小registrySize=localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp=System.currentTimeMillis();
        }
if (logger.isDebugEnabled()) {
            ...省略...
            }
    } catch (Throwablee) {
logger.error("Cannot fetch registry from server", e);
    }
}
...省略代码...

CacheRefreshThread.refreshRegistry方法 中首先会拉取Regions区域列表,默认为空,然后执行Discovery.fetchRegistry方法拉取注册表,继续跟踪下去

privatebooleanfetchRegistry(booleanforceFullRegistryFetch) {
//计算器开始Stopwatchtracer=FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all// applications//这里是获取本地所有的注册的服务应用,里面包含了注册的所有的服务Applicationsapplications=getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
||forceFullRegistryFetch|| (applications==null)
|| (applications.getRegisteredApplications().size() ==0)
|| (applications.getVersion() ==-1)) //Client application does not have latest library supporting delta        {
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications==null));
logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() ==0));
logger.info("Application version is -1: {}", (applications.getVersion() ==-1));
//获取并存储完整注册表getAndStoreFullRegistry();
        } else {
//从eureka服务器获取注册表信息,差别获取,并在本地更新getAndUpdateDelta(applications);
        }
applications.setAppsHashCode(applicat                                     ...省略代码...                  

在fetchRegistry方法中,首先会通过getApplications();得到Applications本地注册服务列表,然后这里有两种情况,一是调用getAndStoreFullRegistryeureka服务器全量获取注册表,而是调用getAndUpdateDelta(applications);从eureka服务器获取有差别注册表信息,并在本地更新,在项目刚启动的时候会使用全量,后续会采用差别获取,

2.注册表全量获取

DiscoveryClient.getAndStoreFullRegistry的作用是获得并存储完整的注册表,跟踪进去

privatevoidgetAndStoreFullRegistry() throwsThrowable {
longcurrentUpdateGeneration=fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applicationsapps=null;
//通过eurekaTransport.queryClient得到一个EurekaHttpClientEurekaHttpResponse<Applications>httpResponse=clientConfig.getRegistryRefreshSingleVipAddress() ==null?eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
        : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) {
apps=httpResponse.getEntity();
    }
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps==null) {
logger.error("The application is null for some reason. Not storing this information");
    } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) {
//存储注册表到Applications中localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
logger.warn("Not updating applications as another thread is updating it already");
    }
}

我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,使用的是其装饰器EurekaHttpClientDecorator.getApplications方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications获取所有的服务注册列表,跟踪一下源码

@OverridepublicEurekaHttpResponse<Applications>getApplications(finalString... regions) {
returnexecute(newRequestExecutor<Applications>() {
@OverridepublicEurekaHttpResponse<Applications>execute(EurekaHttpClientdelegate) {
//最终会调用AbstractJerseyEurekaHttpClient获取注册表returndelegate.getApplications(regions);
        }
@OverridepublicRequestTypegetRequestType() {
returnRequestType.GetApplications;
        }
    });
}

这里通过装饰类,先后会执行

RetryableEurekaHttpClient(失败重试),

RedirectingEurekaHttpClient(重定向到不同的EurekaServer)

MetricsCollectingEurekaHttpClient(统计执行情况)

最终通过AbstractJerseyEurekaHttpClient,使用jersey发起注册表的获取,源码如下

publicabstractclassAbstractJerseyEurekaHttpClientimplementsEurekaHttpClient {
@OverridepublicEurekaHttpResponse<Applications>getApplications(String... regions) {
returngetApplicationsInternal("apps/", regions);
    }
    ...省略...
privateEurekaHttpResponse<Applications>getApplicationsInternal(StringurlPath, String[] regions) {
ClientResponseresponse=null;
StringregionsParamValue=null;
try {
//得到一个web请求,serviceUrl是服务器的地址WebResourcewebResource=jerseyClient.resource(serviceUrl).path(urlPath);
if (regions!=null&&regions.length>0) {
regionsParamValue=StringUtil.join(regions);
webResource=webResource.queryParam("regions", regionsParamValue);
            }
BuilderrequestBuilder=webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//发送get请求response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applicationsapplications=null;
if (response.getStatus() ==Status.OK.getStatusCode() &&response.hasEntity()) {
//获取返回的服务注册表applications=response.getEntity(Applications.class);
            }
//创建一个相应对象EurekaHttpResponseBuilderreturnanEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
        } finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue==null?"" : "regions="+regionsParamValue,
response==null?"N/A" : response.getStatus()
                            );
            }
if (response!=null) {
response.close();
            }
        }
    }

3.注册表差别获取

在系统第一次启动的时候会调用DiscoveryClient.getAndStoreFullRegistry拉取所有的注册表本存储到本地Applications中,之后会定时30s/次使用DiscoveryClient.getAndUpdateDelta(applications);差别更新注册表,源码如下

privatevoidgetAndUpdateDelta(Applicationsapplications) throwsThrowable {
longcurrentUpdateGeneration=fetchRegistryGeneration.get();
Applicationsdelta=null;
//发送请求获取注册表EurekaHttpResponse<Applications>httpResponse=eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) {
delta=httpResponse.getEntity();
    }
if (delta==null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+"Hence got the full registry.");
getAndStoreFullRegistry();
    } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
StringreconcileHashCode="";
if (fetchRegistryUpdateLock.tryLock()) {
try {
//更新本地注册表updateDelta(delta);
reconcileHashCode=getReconcileHashCode(applications);
            } finally {
fetchRegistryUpdateLock.unlock();
            }
        } else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) ||clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall        }
    } else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

这里依然是通过eurekaTransport.queryClient.获取到服务注册列表,和全量不一样的地方是和本地的Applications中的服务列表做对比,把新的服务注册信息添加到Applications做更新操作

4.服务发现总结

1.每30s/次定时任务调用DiscoveryClient.CacheRefreshThread内部类(Runnable)进行服务注册表的拉取

2.CacheRefreshThread的run方法调用DiscoveryClient.fetchRegistry()拉取服务注册表

3.fetchRegistry方法中先通过Applications applications = getApplications();从本地缓存中获取服务注册表Applications ,然后做出2种处理全量更新discoveryClient.getAndStoreFullRegistry,或者差别更新discoveryClient.getAndUpdateDelta

4.系统启动的时候会做全量更新,通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,获取到注册表存储到本地缓存Applications

5.后续的定时更新注册表都是采用差别更新,也是通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,然后进行一个对比,更新本地的注册表Applications

目录
相关文章
|
9天前
|
Java 应用服务中间件 Nacos
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
22 0
|
12天前
|
监控 数据可视化 安全
一套成熟的Spring Cloud智慧工地平台源码,自主版权,开箱即用
这是一套基于Spring Cloud的智慧工地管理平台源码,具备自主版权,易于使用。平台运用现代技术如物联网、大数据等改进工地管理,服务包括建设各方,提供人员、车辆、视频监控等七大维度的管理。特色在于可视化管理、智能报警、移动办公和分布计算存储。功能涵盖劳务实名制管理、智能考勤、视频监控AI识别、危大工程监控、环境监测、材料管理和进度管理等,实现工地安全、高效的智慧化管理。
|
3天前
|
设计模式 安全 Java
【初学者慎入】Spring源码中的16种设计模式实现
以上是威哥给大家整理了16种常见的设计模式在 Spring 源码中的运用,学习 Spring 源码成为了 Java 程序员的标配,你还知道Spring 中哪些源码中运用了设计模式,欢迎留言与威哥交流。
|
6天前
|
Java Shell 测试技术
Spring源码搭建教程
Spring源码搭建教程
10 0
|
7天前
|
XML 人工智能 Java
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
|
8天前
|
负载均衡 监控 容灾
【SpringCloud】详解Eureka注册中心
【SpringCloud】详解Eureka注册中心
18 0
|
8天前
|
Java Nacos 开发者
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
|
13天前
|
Java Maven Nacos
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
25 0
|
14天前
|
Java 关系型数据库 MySQL
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。一套UWB精确定位系统,最高定位精度可达10cm,具有高精度,高动态,高容量,低功耗的应用。
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
|
14天前
|
人工智能 监控 安全
Java+Spring Cloud +Vue+UniApp微服务智慧工地云平台源码
视频监控系统、人员实名制与分账制管理系统、车辆管理系统、环境监测系统、大型设备监测(龙门吊、塔吊、升降机、卸料平台等)、用电监测系统、基坑监测系统、AI算法分析(安全帽佩戴、火焰识别、周界报警、人员聚众报警、升降机超载报警)、安全培训、设备监测。
20 4