SpringCloud源码阅读2-Eureka客户端的秘密(中)

简介: SpringCloud源码阅读2-Eureka客户端的秘密(中)

2.1.2 fetchRegistry

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
        try {
            //获取本地缓存
            Applications applications = getApplications();
      //如果增量拉取被禁用或是第一次拉取,全量拉取server端已经注册的服务实例信息
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                //获取全部实例
                getAndStoreFullRegistry();
            } else {
              //增量拉取服务实例
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
        // 刷新本地缓存
        onCacheRefreshed();
        // 基于缓存中的实例数据更新远程实例状态, (发布StatusChangeEvent)
        updateInstanceRemoteStatus();
       // 注册表拉取成功后返回true
        return true;
    }

全量获取最终调用

EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());

增量获取

EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());

可以看出,客户端与服务端通信底层是EurekaTransport在提供支持。


2.1.3 initScheduledTasks

在此之前有必要说说:TimedSupervisorTask。 TimedSupervisorTask 是自动调节间隔的周期性任务,当不超时,将以初始化的间隔执行。当任务超时时,将下一个周期的间隔调大。每次超时都会增大相应倍数,直到外部设置的最大参数。一旦新任务不再超时,间隔自动恢复默认值。

也就是说,这是一个具有自适应的周期性任务。(非常棒的设计啊)

private void initScheduledTasks() {
    //1.如果获取服务列表,则创建周期性缓存更新(即获取服务列表任务)任务
        if (clientConfig.shouldFetchRegistry()) {
          //初始间隔时间(默认30秒)
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            //最大倍数 默认10倍
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //执行TimedSupervisorTask ,监督CacheRefreshThread任务的执行。
            //具体执行线程池cacheRefreshExecutor,具体任务CacheRefreshThread
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()//缓存刷新,调用fetchRegistry()获取服务列表
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    //2. 如何注册,就创建周期性续租任务,维持心跳。
        if (clientConfig.shouldRegisterWithEureka()) {
          //心跳间隔,默认30秒。
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            //最大倍数 默认10倍
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            //执行TimedSupervisorTask ,监督HeartbeatThread任务的执行。
            //具体执行线程池heartbeatExecutor,具体任务HeartbeatThread
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
            //3.创建应用实例信息复制器。
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
      //4.创建状态改变监听器,监听StatusChangeEvent 
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    //状态有变化,使用信息复制器,执行一个任务,更新状态变化到注册中心
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
      //是否关注状态变化,将监听器添加到applicationInfoManager
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 启动InstanceInfo复制器
       instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

总结下initScheduledTasks()的工作: 如何配置获取服务

  1. 创建监控任务线程cacheRefresh,监督服务获取CacheRefreshThread(DiscoveryClient-CacheRefreshExecutor-%d)线程的执行,默认每30秒执行一次。获取服务列表更新到本地缓存。任务内容refreshRegistry,本地缓存localRegionApps

如何配置注册,

  1. 创建监控任务线程"heartbeat",监督续约任务HeartbeatThread(DiscoveryClient-HeartbeatExecutor-%d)的执行,默认每30秒执行一次。任务内容为renew()
  2. 创建实例状态监听器,监听当前实例的状态变化,并通过InstanceInfo复制器,执行onDemandUpdate()方法,更新变化到远程server
  3. 启动InstanceInfo复制器定时线程(DiscoveryClient-InstanceInfoReplicator-%d),定时(默认40秒)检测当前实例的DataCenterInfo、LeaseInfo、InstanceStatus,如果有变更,执行InstanceInfoReplicator.this.run()方法将变更信息同步到server

下面我们看看这几个重要的任务内容:

  • refreshRegistry刷新缓存: refreshRegistry最终调用fetchRegistry获取服务列表,更新本地缓存。fetchRegistry在DiscoveryClient初始化时,主动获取一次;之后都是定时获取。
  • renew() 续约:通过eurekaTransport.registrationClient.sendHeartBeat向server发送当前实例信息
  • InstanceInfoReplicator.onDemandUpdate() 状态更新: 一旦有状态变化,停掉定时复制线程,立刻把状态更新到server.  最终调用的是InstanceInfoReplicator.this.run()
  • InstanceInfoReplicator.this.run() 复制当前实例信息到Server: 状态变化时,立即执行;平时每40秒执行一次。


相关文章
|
22小时前
|
负载均衡 Java 开发者
细解微服务架构实践:如何使用Spring Cloud进行Java微服务治理
【6月更文挑战第30天】Spring Cloud是Java微服务治理明星框架,整合Eureka(服务发现)、Ribbon(客户端负载均衡)、Hystrix(断路器)、Zuul(API网关)和Config Server(配置中心),提供完整服务治理解决方案。通过Eureka实现服务注册与发现,Ribbon进行负载均衡,Hystrix确保服务容错,Config Server集中管理配置,Zuul则作为API入口统一处理请求。理解和使用Spring Cloud是现代Java开发者的关键技能。
13 2
|
13小时前
|
负载均衡 安全 Java
Spring Cloud中的服务网格实现
Spring Cloud中的服务网格实现
|
2天前
|
负载均衡 Java API
使用Spring Cloud构建Java微服务架构
使用Spring Cloud构建Java微服务架构
|
2天前
|
负载均衡 算法 Java
Spring Cloud Netflix 之 Ribbon
Spring Cloud Netflix Ribbon是客户端负载均衡器,用于在微服务架构中分发请求。它与RestTemplate结合,自动在服务发现(如Eureka)注册的服务之间进行调用。配置包括在pom.xml中添加依赖,设置application.yml以连接Eureka服务器,并在配置类中创建@LoadBalanced的RestTemplate。通过这种方式,当调用如`/user/userInfoList`的接口时,Ribbon会自动处理到多个可用服务实例的负载均衡。
|
4天前
|
Java API 数据格式
Spring三兄弟:Spring、Spring Boot、Spring Cloud的100个常用注解大盘点
Spring三兄弟:Spring、Spring Boot、Spring Cloud的100个常用注解大盘点
|
2天前
|
Java API 开发者
Spring Cloud Gateway中的GlobalFilter:构建强大的API网关过滤器
Spring Cloud Gateway中的GlobalFilter:构建强大的API网关过滤器
7 0
|
2天前
|
Java Maven 微服务
Spring Cloud Netflix 之 Eureka
Spring Cloud Netflix Eureka是服务发现组件,由Netflix开发,Spring Cloud集成为微服务治理工具。Eureka采用客户端/服务器架构,包含Eureka Server(服务注册中心)和Eureka Client(服务提供者和服务消费者)。服务提供者注册到Eureka Server,服务消费者通过它查找并调用服务。
|
2天前
|
负载均衡 前端开发 Java
Spring Cloud 之 OpenFeign
Spring Cloud OpenFeign是Spring官方的声明式服务调用组件,简化了远程服务调用,使其如同调用本地方法。核心注解包括`@FeignClient`、`@EnableFeignClients`、`@GetMapping`和`@PostMapping`。实践中,通过在`pom.xml`添加依赖,创建Feign接口,配置`@FeignClient`,在启动类启用Feign,以及自定义超时设置来实现远程调用和负载均衡。
|
2天前
|
监控 Java 微服务
Spring Cloud 之 Hystrix
Spring Cloud Hystrix 是一个用于处理分布式系统延迟和容错的库,防止雪崩效应。它作为断路器,当服务故障时通过监控短路,返回备用响应,保持系统弹性。主要功能包括服务降级和熔断:
|
2天前
|
监控 Java API
Spring Cloud 之 GateWay
Spring Cloud Gateway 作为API网关,处理客户端与微服务间的非业务逻辑,如权限验证、监控、路由转发。它通过Route(含ID、目标URI、Predicate和Filter)、Predicate(匹配请求条件)和Filter(请求前/后处理)实现动态路由。工作流程包括客户端请求-&gt;Gateway Handler Mapping-&gt;过滤器链-&gt;服务转发-&gt;响应过滤-&gt;回客户端。过滤器用于请求拦截、响应处理,如参数校验、权限检查。动态路由允许以服务名创建路由,实现服务发现。预设和全局过滤器用于特定或所有路由的定制逻辑,例如登录验证和请求头管理。