四.SpringCloud源码剖析-Eureka Client服务发现

简介: 什么是服务发现?微服务启动,所有服务提供者会向EurekaServer注册自己,从而在EurekaServer形成一个服务注册表,而消费者服务会定时从EurekaServer拉取服务注册表并缓存到本地,这个流程叫服务注册。当消费者服务向提供者服务发起调用时就会从服务注册表中找到目标服务的通信地址发起访问,那么EurekaClient是怎么从EurekaServer拉取服务注册表的呢?前一章节我们研究了《[Eureak服务注册](https://blog.csdn.net/u014494148/article/details/106907911)》流程,这一章节我们来研究一下Eureak服务发现

系列文章目录

一.SpringCloud源码剖析-Eureka核心API

二.SpringCloud源码剖析-Eureka Client 初始化过程

三.SpringCloud源码剖析-Eureka服务注册

四.SpringCloud源码剖析-Eureka服务发现

五.SpringCloud源码剖析-Eureka Client服务续约

六.SpringCloud源码剖析-Eureka Client取消注册

七.SpringCloud源码剖析-Eureka Server的自动配置

八.SpringCloud源码剖析-Eureka Server初始化流程

九.SpringCloud源码剖析-Eureka Server服务注册流程

十.SpringCloud源码剖析-Eureka Server服务续约

十一.SpringCloud源码剖析-Eureka Server服务注册表拉取

十二.SpringCloud源码剖析-Eureka Server服务剔除

十三.SpringCloud源码剖析-Eureka Server服务下线

前言

什么是服务发现?微服务启动,所有服务提供者会向EurekaServer注册自己,从而在EurekaServer形成一个服务注册表,而消费者服务会定时从EurekaServer拉取服务注册表并缓存到本地,这个流程叫服务注册。当消费者服务向提供者服务发起调用时就会从服务注册表中找到目标服务的通信地址发起访问,那么EurekaClient是怎么从EurekaServer拉取服务注册表的呢?前一章节我们研究了《Eureak服务注册》流程,这一章节我们来研究一下Eureak服务发现。

1.初始化服务发现定时任务

学习过上一章节我们知道,在DiscoveryClient初始化过程中会初始化很多的定时任务其中就有对服务发现定时任务,如下:

  /**
      初始化定时调度任务
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
   
   
        //判断是否开启服务发现功能
        if (clientConfig.shouldFetchRegistry()) {
   
   
            // registry cache refresh timer
            //刷新注册表定时任务时间间隔
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //定时任务调度器
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            //属性注册表任务
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
  ....省略代码....

首先判断了clientConfig配置中是否开启了服务发现功能(默认开启)shouldFetchRegistry,然后获取到服务发现定时任务间隔时间registryFetchIntervalSeconds(30s)后就初始化了服务发现的定时任务CacheRefreshThread,它本身是一个Runnable它是Discovery的内部类 ,跟踪进去

 class CacheRefreshThread implements Runnable {
   
   
        public void run() {
   
   
            //调用刷新注册表方法
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
   
   
        try {
   
   
            //这里在获取Regions 不为空,默认为空
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            //这里在获取eureka:client:fetch-remote-regions-registry配置,即远程regions,默认为空
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
   
   
                ....省略.....
            }
            //开始获取注册表fetchRegistry(false);
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
   
   
                //获取注册表大小
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }

            if (logger.isDebugEnabled()) {
   
   
              ...省略...
            }
        } catch (Throwable e) {
   
   
            logger.error("Cannot fetch registry from server", e);
        }
    }

...省略代码...

CacheRefreshThread.refreshRegistry方法 中首先会拉取Regions区域列表,默认为空,然后执行Discovery.fetchRegistry方法拉取注册表,继续跟踪下去

 private boolean fetchRegistry(boolean forceFullRegistryFetch) {
   
   
         //计算器开始
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
   
   
            // If the delta is disabled or if it is the first time, get all
            // applications
            //这里是获取本地所有的注册的服务应用,里面包含了注册的所有的服务
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
   
   
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                //获取并存储完整注册表
                getAndStoreFullRegistry();
            } else {
   
   
                //从eureka服务器获取注册表信息,差别获取,并在本地更新
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applicat
...省略代码...

在fetchRegistry方法中,首先会通过getApplications();得到Applications本地注册服务列表,然后这里有两种情况,一是调用getAndStoreFullRegistry从eureka服务器全量获取注册表,而是调用getAndUpdateDelta(applications);从eureka服务器获取有差别注册表信息,并在本地更新,在项目刚启动的时候会使用全量,后续会采用差别获取,

2.注册表全量获取

DiscoveryClient.getAndStoreFullRegistry的作用是获得并存储完整的注册表,跟踪进去

private void getAndStoreFullRegistry() throws Throwable {
   
   
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        //通过eurekaTransport.queryClient得到一个EurekaHttpClient
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
   
   
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
   
   
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
   
   
            //存储注册表到Applications中
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
   
   
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

我们可以看到 eurekaTransport.queryClient 得到一个EurekaHttpClient,使用的是其装饰器EurekaHttpClientDecorator.getApplications方法获取服务注册列表,这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域,然后调用AbstractJerseyEurekaHttpClient.getApplications获取所有的服务注册列表,跟踪一下源码

@Override
    public EurekaHttpResponse<Applications> getApplications(final String... regions) {
   
   
        return execute(new RequestExecutor<Applications>() {
   
   
            @Override
            public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
   
   
                //最终会调用AbstractJerseyEurekaHttpClient获取注册表
                return delegate.getApplications(regions);
            }

            @Override
            public RequestType getRequestType() {
   
   
                return RequestType.GetApplications;
            }
        });
    }

这里通过装饰类,先后会执行
RetryableEurekaHttpClient(失败重试),
RedirectingEurekaHttpClient(重定向到不同的EurekaServer)
MetricsCollectingEurekaHttpClient(统计执行情况)
最终通过AbstractJerseyEurekaHttpClient,使用jersey发起注册表的获取,源码如下

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
   
   
 @Override
    public EurekaHttpResponse<Applications> getApplications(String... regions) {
   
   
        return getApplicationsInternal("apps/", regions);
    }
...省略...
    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
   
   
        ClientResponse response = null;
        String regionsParamValue = null;
        try {
   
   
            //得到一个web请求,serviceUrl是服务器的地址
            WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
            if (regions != null && regions.length > 0) {
   
   
                regionsParamValue = StringUtil.join(regions);
                webResource = webResource.queryParam("regions", regionsParamValue);
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            //发送get请求
            response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

            Applications applications = null;
            if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
   
   
                //获取返回的服务注册表
                applications = response.getEntity(Applications.class);
            }
            //创建一个相应对象EurekaHttpResponseBuilder
            return anEurekaHttpResponse(response.getStatus(), Applications.class)
                    .headers(headersOf(response))
                    .entity(applications)
                    .build();
        } finally {
   
   
            if (logger.isDebugEnabled()) {
   
   
                logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                        serviceUrl, urlPath,
                        regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                        response == null ? "N/A" : response.getStatus()
                );
            }
            if (response != null) {
   
   
                response.close();
            }
        }
    }

3.注册表差别获取

在系统第一次启动的时候会调用DiscoveryClient.getAndStoreFullRegistry拉取所有的注册表本存储到本地Applications中,之后会定时30s/次使用DiscoveryClient.getAndUpdateDelta(applications);差别更新注册表,源码如下

private void getAndUpdateDelta(Applications applications) throws Throwable {
   
   
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        //发送请求获取注册表
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
   
   
            delta = httpResponse.getEntity();
        }

        if (delta == null) {
   
   
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
   
   
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
   
   
                try {
   
   
                   //更新本地注册表
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
   
   
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
   
   
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
   
   
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
   
   
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

这里依然是通过eurekaTransport.queryClient.获取到服务注册列表,和全量不一样的地方是和本地的Applications中的服务列表做对比,把新的服务注册信息添加到Applications做更新操作

4.服务发现总结

在这里插入图片描述

1.每30s/次定时任务调用DiscoveryClient.CacheRefreshThread内部类(Runnable)进行服务注册表的拉取

2.CacheRefreshThread的run方法调用DiscoveryClient.fetchRegistry()拉取服务注册表

3.fetchRegistry方法中先通过Applications applications = getApplications();从本地缓存中获取服务注册表Applications ,然后做出2种处理全量更新discoveryClient.getAndStoreFullRegistry,或者差别更新discoveryClient.getAndUpdateDelta

4.系统启动的时候会做全量更新,通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,获取到注册表存储到本地缓存Applications

5.后续的定时更新注册表都是采用差别更新,也是通过EurekaHttpClient的装饰EurekaHttpClientDecorator器调用AbstractJerseyEurekaHttpClient使用jersey向EureakServer获取注册表,然后进行一个对比,更新本地的注册表Applications

下一章推荐《Eureka Server服务注册表拉取

相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
2月前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
1月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
54 2
|
1月前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
129 5
|
2月前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
71 9
|
3月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
194 5
|
3月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
3月前
|
Java Spring 容器
Spring IOC、AOP与事务管理底层原理及源码解析
【10月更文挑战第1天】Spring框架以其强大的控制反转(IOC)和面向切面编程(AOP)功能,成为Java企业级开发中的首选框架。本文将深入探讨Spring IOC和AOP的底层原理,并通过源码解析来揭示其实现机制。同时,我们还将探讨Spring事务管理的核心原理,并给出相应的源码示例。
152 9
|
3月前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
3月前
|
缓存 Java Spring
源码解读:Spring如何解决构造器注入的循环依赖?
本文详细探讨了Spring框架中的循环依赖问题,包括构造器注入和字段注入两种情况,并重点分析了构造器注入循环依赖的解决方案。文章通过具体示例展示了循环依赖的错误信息及常见场景,提出了三种解决方法:重构代码、使用字段依赖注入以及使用`@Lazy`注解。其中,`@Lazy`注解通过延迟初始化和动态代理机制有效解决了循环依赖问题。作者建议优先使用`@Lazy`注解,并提供了详细的源码解析和调试截图,帮助读者深入理解其实现机制。
77 1