SpringCloud源码剖析-Eureka Client服务发现

简介: 我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,使用的是其装饰器EurekaHttpClientDecorator.getApplications方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications获取所有的服务注册列表,跟踪一下源码

前言

什么是服务发现?微服务启动,所有服务提供者会向EurekaServer注册自己,从而在EurekaServer形成一个服务注册表,而消费者服务会定时从EurekaServer拉取服务注册表并缓存到本地,这个流程叫服务注册。当消费者服务向提供者服务发起调用时就会从服务注册表中找到目标服务的通信地址发起访问,那么EurekaClient是怎么从EurekaServer拉取服务注册表的呢?前一章节我们研究了《Eureak服务注册》流程,这一章节我们来研究一下Eureak服务发现。

1.初始化服务发现定时任务

学习过上一章节我们知道,在DiscoveryClient初始化过程中会初始化很多的定时任务其中就有对服务发现定时任务,如下:

/**初始化定时调度任务* Initializes all scheduled tasks.*/privatevoidinitScheduledTasks() {
//判断是否开启服务发现功能if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer//刷新注册表定时任务时间间隔intregistryFetchIntervalSeconds=clientConfig.getRegistryFetchIntervalSeconds();
intexpBackOffBound=clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//定时任务调度器scheduler.schedule(
newTimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//属性注册表任务newCacheRefreshThread()
            ),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    ....省略代码....

首先判断了clientConfig配置中是否开启了服务发现功能(默认开启)shouldFetchRegistry,然后获取到服务发现定时任务间隔时间registryFetchIntervalSeconds(30s)后就初始化了服务发现的定时任务CacheRefreshThread,它本身是一个Runnable它是Discovery的内部类 ,跟踪进去

classCacheRefreshThreadimplementsRunnable {
publicvoidrun() {
//调用刷新注册表方法refreshRegistry();
    }
}
@VisibleForTestingvoidrefreshRegistry() {
try {
//这里在获取Regions 不为空,默认为空booleanisFetchingRemoteRegionRegistries=isFetchingRemoteRegionRegistries();
booleanremoteRegionsModified=false;
// This makes sure that a dynamic change to remote regions to fetch is honored.//这里在获取eureka:client:fetch-remote-regions-registry配置,即远程regions,默认为空StringlatestRemoteRegions=clientConfig.fetchRegistryForRemoteRegions();
if (null!=latestRemoteRegions) {
            ....省略.....
            }
//开始获取注册表fetchRegistry(false);booleansuccess=fetchRegistry(remoteRegionsModified);
if (success) {
//获取注册表大小registrySize=localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp=System.currentTimeMillis();
        }
if (logger.isDebugEnabled()) {
            ...省略...
            }
    } catch (Throwablee) {
logger.error("Cannot fetch registry from server", e);
    }
}
...省略代码...

CacheRefreshThread.refreshRegistry方法 中首先会拉取Regions区域列表,默认为空,然后执行Discovery.fetchRegistry方法拉取注册表,继续跟踪下去

privatebooleanfetchRegistry(booleanforceFullRegistryFetch) {
//计算器开始Stopwatchtracer=FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all// applications//这里是获取本地所有的注册的服务应用,里面包含了注册的所有的服务Applicationsapplications=getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
||forceFullRegistryFetch|| (applications==null)
|| (applications.getRegisteredApplications().size() ==0)
|| (applications.getVersion() ==-1)) //Client application does not have latest library supporting delta        {
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications==null));
logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() ==0));
logger.info("Application version is -1: {}", (applications.getVersion() ==-1));
//获取并存储完整注册表getAndStoreFullRegistry();
        } else {
//从eureka服务器获取注册表信息,差别获取,并在本地更新getAndUpdateDelta(applications);
        }
applications.setAppsHashCode(applicat                                     ...省略代码...                  

在fetchRegistry方法中,首先会通过getApplications();得到Applications本地注册服务列表,然后这里有两种情况,一是调用getAndStoreFullRegistryeureka服务器全量获取注册表,而是调用getAndUpdateDelta(applications);从eureka服务器获取有差别注册表信息,并在本地更新,在项目刚启动的时候会使用全量,后续会采用差别获取,

2.注册表全量获取

DiscoveryClient.getAndStoreFullRegistry的作用是获得并存储完整的注册表,跟踪进去

privatevoidgetAndStoreFullRegistry() throwsThrowable {
longcurrentUpdateGeneration=fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applicationsapps=null;
//通过eurekaTransport.queryClient得到一个EurekaHttpClientEurekaHttpResponse<Applications>httpResponse=clientConfig.getRegistryRefreshSingleVipAddress() ==null?eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
        : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) {
apps=httpResponse.getEntity();
    }
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps==null) {
logger.error("The application is null for some reason. Not storing this information");
    } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) {
//存储注册表到Applications中localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
logger.warn("Not updating applications as another thread is updating it already");
    }
}

我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,使用的是其装饰器EurekaHttpClientDecorator.getApplications方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications获取所有的服务注册列表,跟踪一下源码

@OverridepublicEurekaHttpResponse<Applications>getApplications(finalString... regions) {
returnexecute(newRequestExecutor<Applications>() {
@OverridepublicEurekaHttpResponse<Applications>execute(EurekaHttpClientdelegate) {
//最终会调用AbstractJerseyEurekaHttpClient获取注册表returndelegate.getApplications(regions);
        }
@OverridepublicRequestTypegetRequestType() {
returnRequestType.GetApplications;
        }
    });
}

这里通过装饰类,先后会执行

RetryableEurekaHttpClient(失败重试),

RedirectingEurekaHttpClient(重定向到不同的EurekaServer)

MetricsCollectingEurekaHttpClient(统计执行情况)

最终通过AbstractJerseyEurekaHttpClient,使用jersey发起注册表的获取,源码如下

publicabstractclassAbstractJerseyEurekaHttpClientimplementsEurekaHttpClient {
@OverridepublicEurekaHttpResponse<Applications>getApplications(String... regions) {
returngetApplicationsInternal("apps/", regions);
    }
    ...省略...
privateEurekaHttpResponse<Applications>getApplicationsInternal(StringurlPath, String[] regions) {
ClientResponseresponse=null;
StringregionsParamValue=null;
try {
//得到一个web请求,serviceUrl是服务器的地址WebResourcewebResource=jerseyClient.resource(serviceUrl).path(urlPath);
if (regions!=null&&regions.length>0) {
regionsParamValue=StringUtil.join(regions);
webResource=webResource.queryParam("regions", regionsParamValue);
            }
BuilderrequestBuilder=webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//发送get请求response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applicationsapplications=null;
if (response.getStatus() ==Status.OK.getStatusCode() &&response.hasEntity()) {
//获取返回的服务注册表applications=response.getEntity(Applications.class);
            }
//创建一个相应对象EurekaHttpResponseBuilderreturnanEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
        } finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue==null?"" : "regions="+regionsParamValue,
response==null?"N/A" : response.getStatus()
                            );
            }
if (response!=null) {
response.close();
            }
        }
    }

3.注册表差别获取

在系统第一次启动的时候会调用DiscoveryClient.getAndStoreFullRegistry拉取所有的注册表本存储到本地Applications中,之后会定时30s/次使用DiscoveryClient.getAndUpdateDelta(applications);差别更新注册表,源码如下

privatevoidgetAndUpdateDelta(Applicationsapplications) throwsThrowable {
longcurrentUpdateGeneration=fetchRegistryGeneration.get();
Applicationsdelta=null;
//发送请求获取注册表EurekaHttpResponse<Applications>httpResponse=eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() ==Status.OK.getStatusCode()) {
delta=httpResponse.getEntity();
    }
if (delta==null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+"Hence got the full registry.");
getAndStoreFullRegistry();
    } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration+1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
StringreconcileHashCode="";
if (fetchRegistryUpdateLock.tryLock()) {
try {
//更新本地注册表updateDelta(delta);
reconcileHashCode=getReconcileHashCode(applications);
            } finally {
fetchRegistryUpdateLock.unlock();
            }
        } else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) ||clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall        }
    } else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

这里依然是通过eurekaTransport.queryClient.获取到服务注册列表,和全量不一样的地方是和本地的Applications中的服务列表做对比,把新的服务注册信息添加到Applications做更新操作

4.服务发现总结

1.每30s/次定时任务调用DiscoveryClient.CacheRefreshThread内部类(Runnable)进行服务注册表的拉取

2.CacheRefreshThread的run方法调用DiscoveryClient.fetchRegistry()拉取服务注册表

3.fetchRegistry方法中先通过Applications applications = getApplications();从本地缓存中获取服务注册表Applications ,然后做出2种处理全量更新discoveryClient.getAndStoreFullRegistry,或者差别更新discoveryClient.getAndUpdateDelta

4.系统启动的时候会做全量更新,通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,获取到注册表存储到本地缓存Applications

5.后续的定时更新注册表都是采用差别更新,也是通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,然后进行一个对比,更新本地的注册表Applications

目录
相关文章
|
3月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
130 2
|
3月前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
158 5
|
21天前
|
Java Spring 容器
springcloud-config客户端启用服务发现报错找不到bean EurekaHttpClient
解决 Spring Cloud Config 客户端启用服务发现时报错找不到 bean `EurekaHttpClient` 的问题,主要涉及版本兼容性、依赖配置和正确的配置文件设置。通过检查依赖版本、添加必要的依赖项、配置文件的正确性以及启用服务发现注解,可以有效解决此问题。确保日志中没有其他错误信息也是关键步骤之一。通过这些方法,可以确保 Spring Cloud Config 与 Eureka 客户端正常工作。
24 6
|
30天前
|
人工智能 安全 Java
AI 时代:从 Spring Cloud Alibaba 到 Spring AI Alibaba
本次分享由阿里云智能集团云原生微服务技术负责人李艳林主讲,主题为“AI时代:从Spring Cloud Alibaba到Spring AI Alibaba”。内容涵盖应用架构演进、AI agent框架发展趋势及Spring AI Alibaba的重磅发布。分享介绍了AI原生架构与传统架构的融合,强调了API优先、事件驱动和AI运维的重要性。同时,详细解析了Spring AI Alibaba的三层抽象设计,包括模型支持、工作流智能体编排及生产可用性构建能力,确保安全合规、高效部署与可观测性。最后,结合实际案例展示了如何利用私域数据优化AI应用,提升业务价值。
123 4
|
1月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
2月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
74 2
|
2月前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
206 5
|
3月前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
96 9
|
4月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
236 5
|
4月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)