SpringCloud源码剖析-Eureka Client服务发现

简介: 我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,使用的是其装饰器EurekaHttpClientDecorator.getApplications方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications获取所有的服务注册列表,跟踪一下源码

前言

什么是服务发现?微服务启动,所有服务提供者会向EurekaServer注册自己,从而在EurekaServer形成一个服务注册表,而消费者服务会定时从EurekaServer拉取服务注册表并缓存到本地,这个流程叫服务注册。当消费者服务向提供者服务发起调用时就会从服务注册表中找到目标服务的通信地址发起访问,那么EurekaClient是怎么从EurekaServer拉取服务注册表的呢?前一章节我们研究了《Eureak服务注册》流程,这一章节我们来研究一下Eureak服务发现。

1.初始化服务发现定时任务

学习过上一章节我们知道,在DiscoveryClient初始化过程中会初始化很多的定时任务其中就有对服务发现定时任务,如下:

/**初始化定时调度任务* Initializes all scheduled tasks.*/privatevoidinitScheduledTasks() {
//判断是否开启服务发现功能if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer//刷新注册表定时任务时间间隔intregistryFetchIntervalSeconds=clientConfig.getRegistryFetchIntervalSeconds();
intexpBackOffBound=clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//定时任务调度器scheduler.schedule(
newTimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//属性注册表任务newCacheRefreshThread()
            ),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    ....省略代码....

首先判断了clientConfig配置中是否开启了服务发现功能(默认开启)shouldFetchRegistry,然后获取到服务发现定时任务间隔时间registryFetchIntervalSeconds(30s)后就初始化了服务发现的定时任务CacheRefreshThread,它本身是一个Runnable它是Discovery的内部类 ,跟踪进去

classCacheRefreshThreadimplementsRunnable {
publicvoidrun() {
//调用刷新注册表方法refreshRegistry();
    }
}
@VisibleForTestingvoidrefreshRegistry() {
try {
//这里在获取Regions 不为空,默认为空booleanisFetchingRemoteRegionRegistries=isFetchingRemoteRegionRegistries();
booleanremoteRegionsModified=false;
// This makes sure that a dynamic change to remote regions to fetch is honored.//这里在获取eureka:client:fetch-remote-regions-registry配置,即远程regions,默认为空StringlatestRemoteRegions=clientConfig.fetchRegistryForRemoteRegions();
if (null!=latestRemoteRegions) {
            ....省略.....
            }
//开始获取注册表fetchRegistry(false);booleansuccess=fetchRegistry(remoteRegionsModified);
if (success) {
//获取注册表大小registrySize=localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp=System.currentTimeMillis();
        }
if (logger.isDebugEnabled()) {
            ...省略...
            }
    } catch (Throwablee) {
logger.error("Cannot fetch registry from server", e);
    }
}
...省略代码...

CacheRefreshThread.refreshRegistry方法 中首先会拉取Regions区域列表,默认为空,然后执行Discovery.fetchRegistry方法拉取注册表,继续跟踪下去

privatebooleanfetchRegistry(booleanforceFullRegistryFetch) {
//计算器开始Stopwatchtracer=FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all// applications//这里是获取本地所有的注册的服务应用,里面包含了注册的所有的服务Applicationsapplications=getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
||forceFullRegistryFetch|| (applications==null)
|| (applications.getRegisteredApplications().size() ==0)
|| (applications.getVersion() ==-1)) //Client application does not have latest library supporting delta        {
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications==null));
logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() ==0));
logger.info("Application version is -1: {}", (applications.getVersion() ==-1));
//获取并存储完整注册表getAndStoreFullRegistry();
        } else {
//从eureka服务器获取注册表信息,差别获取,并在本地更新getAndUpdateDelta(applications);
        }
applications.setAppsHashCode(applicat                                     ...省略代码...                  

在fetchRegistry方法中,首先会通过getApplications();得到Applications本地注册服务列表,然后这里有两种情况,一是调用getAndStoreFullRegistryeureka服务器全量获取注册表,而是调用getAndUpdateDelta(applications);从eureka服务器获取有差别注册表信息,并在本地更新,在项目刚启动的时候会使用全量,后续会采用差别获取,

2.注册表全量获取

DiscoveryClient.getAndStoreFullRegistry的作用是获得并存储完整的注册表,跟踪进去

privatevoidgetAndStoreFullRegistry() throwsThrowable {
longcurrentUpdateGeneration=fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applicationsapps=null;
//通过eurekaTransport.queryClient得到一个EurekaHttpClientEurekaHttpResponse<Applications>httpResponse=clientConfig.getRegistryRefreshSingleVipAddress() ==null?eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
        : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) {
apps=httpResponse.getEntity();
    }
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps==null) {
logger.error("The application is null for some reason. Not storing this information");
    } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) {
//存储注册表到Applications中localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
logger.warn("Not updating applications as another thread is updating it already");
    }
}

我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,使用的是其装饰器EurekaHttpClientDecorator.getApplications方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications获取所有的服务注册列表,跟踪一下源码

@OverridepublicEurekaHttpResponse<Applications>getApplications(finalString... regions) {
returnexecute(newRequestExecutor<Applications>() {
@OverridepublicEurekaHttpResponse<Applications>execute(EurekaHttpClientdelegate) {
//最终会调用AbstractJerseyEurekaHttpClient获取注册表returndelegate.getApplications(regions);
        }
@OverridepublicRequestTypegetRequestType() {
returnRequestType.GetApplications;
        }
    });
}

这里通过装饰类,先后会执行

RetryableEurekaHttpClient(失败重试),

RedirectingEurekaHttpClient(重定向到不同的EurekaServer)

MetricsCollectingEurekaHttpClient(统计执行情况)

最终通过AbstractJerseyEurekaHttpClient,使用jersey发起注册表的获取,源码如下

publicabstractclassAbstractJerseyEurekaHttpClientimplementsEurekaHttpClient {
@OverridepublicEurekaHttpResponse<Applications>getApplications(String... regions) {
returngetApplicationsInternal("apps/", regions);
    }
    ...省略...
privateEurekaHttpResponse<Applications>getApplicationsInternal(StringurlPath, String[] regions) {
ClientResponseresponse=null;
StringregionsParamValue=null;
try {
//得到一个web请求,serviceUrl是服务器的地址WebResourcewebResource=jerseyClient.resource(serviceUrl).path(urlPath);
if (regions!=null&&regions.length>0) {
regionsParamValue=StringUtil.join(regions);
webResource=webResource.queryParam("regions", regionsParamValue);
            }
BuilderrequestBuilder=webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//发送get请求response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applicationsapplications=null;
if (response.getStatus() ==Status.OK.getStatusCode() &&response.hasEntity()) {
//获取返回的服务注册表applications=response.getEntity(Applications.class);
            }
//创建一个相应对象EurekaHttpResponseBuilderreturnanEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
        } finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue==null?"" : "regions="+regionsParamValue,
response==null?"N/A" : response.getStatus()
                            );
            }
if (response!=null) {
response.close();
            }
        }
    }

3.注册表差别获取

在系统第一次启动的时候会调用DiscoveryClient.getAndStoreFullRegistry拉取所有的注册表本存储到本地Applications中,之后会定时30s/次使用DiscoveryClient.getAndUpdateDelta(applications);差别更新注册表,源码如下

privatevoidgetAndUpdateDelta(Applicationsapplications) throwsThrowable {
longcurrentUpdateGeneration=fetchRegistryGeneration.get();
Applicationsdelta=null;
//发送请求获取注册表EurekaHttpResponse<Applications>httpResponse=eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) {
delta=httpResponse.getEntity();
    }
if (delta==null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+"Hence got the full registry.");
getAndStoreFullRegistry();
    } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
StringreconcileHashCode="";
if (fetchRegistryUpdateLock.tryLock()) {
try {
//更新本地注册表updateDelta(delta);
reconcileHashCode=getReconcileHashCode(applications);
            } finally {
fetchRegistryUpdateLock.unlock();
            }
        } else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) ||clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall        }
    } else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

这里依然是通过eurekaTransport.queryClient.获取到服务注册列表,和全量不一样的地方是和本地的Applications中的服务列表做对比,把新的服务注册信息添加到Applications做更新操作

4.服务发现总结

1.每30s/次定时任务调用DiscoveryClient.CacheRefreshThread内部类(Runnable)进行服务注册表的拉取

2.CacheRefreshThread的run方法调用DiscoveryClient.fetchRegistry()拉取服务注册表

3.fetchRegistry方法中先通过Applications applications = getApplications();从本地缓存中获取服务注册表Applications ,然后做出2种处理全量更新discoveryClient.getAndStoreFullRegistry,或者差别更新discoveryClient.getAndUpdateDelta

4.系统启动的时候会做全量更新,通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,获取到注册表存储到本地缓存Applications

5.后续的定时更新注册表都是采用差别更新,也是通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,然后进行一个对比,更新本地的注册表Applications

目录
相关文章
|
5月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
926 3
|
3月前
|
负载均衡 Java API
《深入理解Spring》Spring Cloud 构建分布式系统的微服务全家桶
Spring Cloud为微服务架构提供一站式解决方案,涵盖服务注册、配置管理、负载均衡、熔断限流等核心功能,助力开发者构建高可用、易扩展的分布式系统,并持续向云原生演进。
|
5月前
|
设计模式 Java 开发者
如何快速上手【Spring AOP】?从动态代理到源码剖析(下篇)
Spring AOP的实现本质上依赖于代理模式这一经典设计模式。代理模式通过引入代理对象作为目标对象的中间层,实现了对目标对象访问的控制与增强,其核心价值在于解耦核心业务逻辑与横切关注点。在框架设计中,这种模式广泛用于实现功能扩展(如远程调用、延迟加载)、行为拦截(如权限校验、异常处理)等场景,为系统提供了更高的灵活性和可维护性。
|
9月前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
559 70
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
404 2
|
10月前
|
负载均衡 Dubbo Java
Spring Cloud Alibaba与Spring Cloud区别和联系?
Spring Cloud Alibaba与Spring Cloud区别和联系?
|
11月前
|
前端开发 Java Nacos
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
本文介绍了如何使用Spring Cloud Alibaba 2023.0.0.0技术栈构建微服务网关,以应对微服务架构中流量治理与安全管控的复杂性。通过一个包含鉴权服务、文件服务和主服务的项目,详细讲解了网关的整合与功能开发。首先,通过统一路由配置,将所有请求集中到网关进行管理;其次,实现了限流防刷功能,防止恶意刷接口;最后,添加了登录鉴权机制,确保用户身份验证。整个过程结合Nacos注册中心,确保服务注册与配置管理的高效性。通过这些实践,帮助开发者更好地理解和应用微服务网关。
1934 0
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
|
12月前
|
Java Spring 容器
springcloud-config客户端启用服务发现报错找不到bean EurekaHttpClient
解决 Spring Cloud Config 客户端启用服务发现时报错找不到 bean `EurekaHttpClient` 的问题,主要涉及版本兼容性、依赖配置和正确的配置文件设置。通过检查依赖版本、添加必要的依赖项、配置文件的正确性以及启用服务发现注解,可以有效解决此问题。确保日志中没有其他错误信息也是关键步骤之一。通过这些方法,可以确保 Spring Cloud Config 与 Eureka 客户端正常工作。
253 6

热门文章

最新文章