十一.SpringCloud源码剖析-Eureka Server服务注册表拉取

简介: 这一章我们来分析一下Eureka Server 服务注册表的拉取流程,请结合《[Eureka Client服务发现](https://blog.csdn.net/u014494148/article/details/108217656)》

系列文章目录

一.SpringCloud源码剖析-Eureka核心API

二.SpringCloud源码剖析-Eureka Client 初始化过程

三.SpringCloud源码剖析-Eureka服务注册

四.SpringCloud源码剖析-Eureka服务发现

五.SpringCloud源码剖析-Eureka Client服务续约

六.SpringCloud源码剖析-Eureka Client取消注册

七.SpringCloud源码剖析-Eureka Server的自动配置

八.SpringCloud源码剖析-Eureka Server初始化流程

九.SpringCloud源码剖析-Eureka Server服务注册流程

十.SpringCloud源码剖析-Eureka Server服务续约

十一.SpringCloud源码剖析-Eureka Server服务注册表拉取

十二.SpringCloud源码剖析-Eureka Server服务剔除

十三.SpringCloud源码剖析-Eureka Server服务下线

前言

这一章我们来分析一下Eureka Server 服务注册表的拉取流程,请结合《Eureka Client服务发现

<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">

在《Eureka Client服务发现》我们分析了,客户端会通过两种方式从服务端拉取注册表,在客户端系统启动的时候会进行全量拉取,随后默认30s/次会进行差异更新,那么在Eureka Server 服务端是如何处理服务注册表全量拉取和差异更新的呢?

全量拉取

Eureka Client向Eureka Server发请求,拉取服务注册表,Server端还是通过ServeltContainer接待请求,最终交给com.netflix.eureka.resources.ApplicationsResource#getContainers处理

/**
    返回所有的应用
     * Get information about all {@link com.netflix.discovery.shared.Applications}.
*/
 @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {
   

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
   
                //注册表全量拉取统计计数增加
            EurekaMonitors.GET_ALL.increment();
        } else {
   
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }

        // Check if the server allows the access to the registry. The server can
        // restrict access if it is not
        // ready to serve traffic depending on various reasons.
        //检查服务端时候准备好可以被访问
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
   
            return Response.status(Status.FORBIDDEN).build();
        }
        CurrentRequestVersion.set(Version.toEnum(version));
        //处理返回的数据类型默认JSON
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
   
            //请求头么有指定格式,返回XML格式
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
        //创建缓存key
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,    //通过ALL_APPS构建key 
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        Response response;
         //这里判断是否是GZIP格式,返回结果的编码类型不一样,获取方式是一致的
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
   
            //如果格式是gzip,调用responseCache.getGZIP(cacheKey)获取
            //底层会从一个ConcurrentMap<Key, Value> readOnlyCacheMap 只读缓存中去获取全量注册表
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
   
            //普通获取responseCache.get(cacheKey)
            //底层会从一个ConcurrentMap<Key, Value> readOnlyCacheMap 只读缓存中去获取全量注册表
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        return response;
    }

responseCache.getGZIP(cacheKey)最终会调用 com.netflix.eureka.registry.ResponseCacheImpl#getValue


    /**
     * Get the payload in both compressed and uncompressed form.
     */
    @VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
   
        Value payload = null;
        try {
   
            if (useReadOnlyCache) {
   
                //从只读缓存中获取
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
   
                    payload = currentPayload;
                } else {
   
                    //如果只读缓存中获取不到,从读写缓存中获取
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
   
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
   
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }

差异更新

差异更新也在ApplicationsResource中:com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential,源码如下

  /**
          获取Applications服务注册表中有改变的服务,注册,取消,状态更改和过期都会造成服务的改变
     * Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.
     *
     * <p>
     * The delta changes represent the registry information change for a period
     * as configured by
     * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The
     * changes that can happen in a registry include
     * <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally
     * the changes to the registry are infrequent and hence getting just the
     * delta will be much more efficient than getting the complete registry.
     * </p>
     *
     * <p>
     * Since the delta information is cached over a period of time, the requests
     * may return the same data multiple times within the window configured by
     * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients
     * are expected to handle this duplicate information.
     * <p>
     *
     * @param version the version of the request.
     * @param acceptHeader the accept header to indicate whether to serve  JSON or XML data.
     * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
     * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
     * @param uriInfo  the {@link java.net.URI} information of the request made.
     * @return response containing the delta information of the
     *         {@link AbstractInstanceRegistry}.
     */
    @Path("delta")
    @GET
    public Response getContainerDifferential(
            @PathParam("version") String version,
            @HeaderParam(HEADER_ACCEPT) String acceptHeader,
            @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
            @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
            @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
   

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

        // If the delta flag is disabled in discovery or if the lease expiration
        // has been disabled, redirect clients to get all instances
        //如果禁用了Delta注册表差异化拉取,或者服务不可访问,返回拒绝
        if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
   
            return Response.status(Status.FORBIDDEN).build();
        }

        String[] regions = null;
        if (!isRemoteRegionRequested) {
   
            EurekaMonitors.GET_ALL_DELTA.increment();
        } else {
   
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
        }

        CurrentRequestVersion.set(Version.toEnum(version));
        //处理反会的数据格式JSON默认
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
   
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
        //构建缓存key
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS_DELTA,    //通过ALL_APPS_DELTA构建key 
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        if (acceptEncoding != null
                && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
   
                //从responseCache获取内容
            return Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
   
            return Response.ok(responseCache.get(cacheKey))
                    .build();
        }
    }

总结

Eureka Server 拉取服务注册表的逻辑还是比较简单的,不管是全量拉取,还是差别拉取都是通过ApplicationsResource中处理,然后构建出不同的key,从ResponseCache中去获取服务。

相关文章
|
14天前
|
负载均衡 监控 Java
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
|
3天前
|
运维 Java Nacos
Spring Cloud应用框架:Nacos作为服务注册中心和配置中心
Spring Cloud应用框架:Nacos作为服务注册中心和配置中心
|
22天前
|
Java Spring 容器
Spring Boot 启动源码解析结合Spring Bean生命周期分析
Spring Boot 启动源码解析结合Spring Bean生命周期分析
60 11
|
22天前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
41 3
|
23天前
|
缓存 Java 程序员
spring IoC 源码
spring IoC 源码
38 3
|
1天前
|
Java 应用服务中间件 数据库
SpringCloud:服务保护和分布式事务详解
SpringCloud:服务保护和分布式事务详解
11 0
|
2天前
|
缓存 Java Maven
SpringCloud基于Eureka的服务治理架构搭建与测试:从服务提供者到消费者的完整流程
Spring Cloud微服务框架中的Eureka是一个用于服务发现和注册的基础组件,它基于RESTful风格,为微服务架构提供了关键的服务注册与发现功能。以下是对Eureka的详细解析和搭建举例。
|
1月前
|
Java API 开发工具
Spring Boot与Spring Cloud Config的集成
Spring Boot与Spring Cloud Config的集成
|
26天前
|
监控 NoSQL Java
通用快照方案问题之Martin Flower提出的微服务之间的通信如何解决
通用快照方案问题之Martin Flower提出的微服务之间的通信如何解决
31 0
|
28天前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14208 19