三.SpringCloud源码剖析-Eureka Client服务注册

简介: 在上一章《[Eureka Client 初始化过程](https://blog.csdn.net/u014494148/article/details/107982920)》中我们了解到,应用程序在启动的时候就会初始化Eureka并触发Eureka的自动注册,最终会调用`DiscoveryClient`进行服务注册,我们来跟踪一下DiscoveryClient是如何实现服务注册与发现的。

系列文章目录

一.SpringCloud源码剖析-Eureka核心API

二.SpringCloud源码剖析-Eureka Client 初始化过程

三.SpringCloud源码剖析-Eureka服务注册

四.SpringCloud源码剖析-Eureka服务发现

五.SpringCloud源码剖析-Eureka Client服务续约

六.SpringCloud源码剖析-Eureka Client取消注册

七.SpringCloud源码剖析-Eureka Server的自动配置

八.SpringCloud源码剖析-Eureka Server初始化流程

九.SpringCloud源码剖析-Eureka Server服务注册流程

十.SpringCloud源码剖析-Eureka Server服务续约

十一.SpringCloud源码剖析-Eureka Server服务注册表拉取

十二.SpringCloud源码剖析-Eureka Server服务剔除

十三.SpringCloud源码剖析-Eureka Server服务下线

前言

文章过长,您需要有些耐心!!!

在上一章《Eureka Client 初始化过程》中我们了解到,应用程序在启动的时候就会初始化Eureka并触发Eureka的自动注册,最终会调用DiscoveryClient进行服务注册,我们来跟踪一下DiscoveryClient是如何实现服务注册与发现的。

1.DiscoveryClient 初始化定时任务

程序启动EurekaClientAutoConfiguration被加载,EurekaClientEurekaClientAutoConfiguration 中通过“延迟@Lazy”注册。同时EurekaAutoServiceRegistration 监听启动事件,调用 EurekaServiceRegistry的register方法进行注册,该方法会触发EurekaClient的创建 。自动配置类中有如下代码

    //可刷新的Eureka客户端配置
    @Configuration
    @ConditionalOnRefreshScope
    protected static class RefreshableEurekaClientConfiguration {
   
   

        @Autowired
        private ApplicationContext context;

        @Autowired
        private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
        //注册EurekaClient
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
   
   
            //初始化InstanceInfo,服务实例注册信息对象
            manager.getInfo(); // force initialization
            //创建CloudEurekaClient ,他是 EurekaClient的实现类
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }
        ...省略...

这里初始化InstanceInfo服务注册实例之后,创建了EurekaClient 客户端,通过子类 CloudEurekaClient进行创建,只不过这里是@Lazy延迟创建,在,跟踪下去,我们看一下CloudEurekaClient是如何创建的

public class CloudEurekaClient extends DiscoveryClient {
   
   
    ...省略...
    private ApplicationInfoManager applicationInfoManager;
    private AtomicReference<EurekaHttpClient> eurekaHttpClient = new AtomicReference<>();

    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                             EurekaClientConfig config, ApplicationEventPublisher publisher) {
   
   
        this(applicationInfoManager, config, null, publisher);
    }
    //创建CloudEurekaClient的构造器
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                             EurekaClientConfig config,
                             AbstractDiscoveryClientOptionalArgs<?> args,
                             ApplicationEventPublisher publisher) {
   
   
        //通过父类DiscoveryClient进行初始化
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager;
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }

这里super(applicationInfoManager, config, args);通过父类初始化他的父类是DiscoveryClient,DiscoveryClient的父接口是EurekaClient
在这里插入图片描述
我们继续跟踪上去DiscoveryClient的构造器


@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
   
   
        if (args != null) {
   
   
            //健康检查处理器
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            //健康检查回调
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
   
   
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        //InstanceInfo管理器
        this.applicationInfoManager = applicationInfoManager;
        //InstanceInfo服务实例注册信息,注册的对象
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        //eureka客户端配置
        clientConfig = config;
        staticClientConfig = clientConfig;
        //和eurekaServrer的通信配置,该配置通过EurekaClientConfigBean来创建
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
   
   
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
   
   
            logger.warn("Setting instanceInfo to a passed in null value");
        }
        //备份注册表
        this.backupRegistryProvider = backupRegistryProvider;

        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
        //注册表过时监控
        if (config.shouldFetchRegistry()) {
   
   
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{
   
   15L, 30L, 60L, 120L, 240L, 480L});
        } else {
   
   
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        //心跳监控
        if (config.shouldRegisterWithEureka()) {
   
   
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{
   
   15L, 30L, 60L, 120L, 240L, 480L});
        } else {
   
   
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
        //如果不注册,不拉取注册列表,置空相关的定时任务以及相关配置
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
   
   
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

            return;  // no need to setup up an network tasks and we are done
        }

        try {
   
   
            //初始化线程池======================================
            //创建定时任务执行器,核心数2
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            //带线程池的执行器,心跳线程池执行器
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            //刷新服务列表缓存线程执行器
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            //初始化和EurekaServer交互的客户端EurekaHttpClient,
            //会创建服务注册的EurekaHttpClient和拉取注册表的EurekaHttpClient
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
   
   
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
   
   
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
   
   
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
   
   
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
        //从来备份中拉取注册表,底层没做实现
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
   
   
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
   
   
            this.preRegistrationHandler.beforeRegistration();
        }
        //如果开启服务注册 
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
   
   
            try {
   
   
                //调用register()方法发起服务注册 ,默认 clientConfig.shouldEnforceRegistrationAtInit是不满足的
                if (!register() ) {
   
   
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
   
   
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
        //初始化定时任务
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

        try {
   
   
            //监视注册表
            Monitors.registerObject(this);
        } catch (Throwable e) {
   
   
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

构造器中做了一些初始化工作

  • 比如初始化心跳线程执行器,服务列表刷新线程执行器
  • 比如创建EurekaTransport,用来和Eureka交互的客户端,内部创建了EurekaHttpClient来发请求
  • 最后调用initScheduledTasks方法进行定时任务的初始化

    2.initScheduledTasks初始化定时任务

    接下来我们详细看一下initScheduledTasks中的代码:

         /**
         初始化所有定时任务
       * Initializes all scheduled tasks.
       */
      private void initScheduledTasks() {
         
         
          //判断如果要拉取注册表
          if (clientConfig.shouldFetchRegistry()) {
         
         
              //刷新注册表的心跳时间间隔
              // registry cache refresh timer
              int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
              int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
              //刷新注册表的定时任务:CacheRefreshThread刷新注册表
              scheduler.schedule(
                      new TimedSupervisorTask(
                              "cacheRefresh",
                              scheduler,
                              cacheRefreshExecutor,
                              registryFetchIntervalSeconds,
                              TimeUnit.SECONDS,
                              expBackOffBound,
                              new CacheRefreshThread()
                      ),
                      registryFetchIntervalSeconds, TimeUnit.SECONDS);
          }
          //是否要注册到Eureaka
          if (clientConfig.shouldRegisterWithEureka()) {
         
         
              //这里取的是租约更新时间 30s/次
              int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
              int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
              logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
              // Heartbeat timer
              //心跳续约的定时任务 :HeartbeatThread 中进行续约,内部调用DiscoverClient.renew方法
              scheduler.schedule(
                      new TimedSupervisorTask(
                              "heartbeat",
                              scheduler,
                              heartbeatExecutor,
                              renewalIntervalInSecs,
                              TimeUnit.SECONDS,
                              expBackOffBound,
                              new HeartbeatThread()
                      ),
                      renewalIntervalInSecs, TimeUnit.SECONDS);
    
              // InstanceInfo replicator
              //注册实例InstanceInfo的复制器,负责将自身的信息周期性的上报到EurekaServer;
              //,内部会通过定时任务调用(赋值时间40s间隔),检查InstanceInfo(DataCenterInfo,LeaseInfo,InstanceStatus)是否有变化,
              //内部调用InstanceInfoReplicator.run方法,再调用Discoverlient.refreshInstanceInfo()方法通过ApplicationInfoManager刷新实例状态
              //刷新完成后会调用 discoveryClient.register();进行注册
              instanceInfoReplicator = new InstanceInfoReplicator(
                      this,
                      instanceInfo,
                      //定时调度时间间隔,30s/次
                      clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                      2); // burstSize
              //创建注册状态改变监听
              statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
         
         
                  @Override
                  public String getId() {
         
         
                      return "statusChangeListener";
                  }
    
                  @Override
                  public void notify(StatusChangeEvent statusChangeEvent) {
         
         
                      if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                              InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
         
         
                          // log at warn level if DOWN was involved
                          logger.warn("Saw local status change event {}", statusChangeEvent);
                      } else {
         
         
                          logger.info("Saw local status change event {}", statusChangeEvent);
                      }
                      //按需更新,使用scheduler执行一个Runnable任务,任务中调用InstanceInfoReplicator.this.run()方法,
                      //run方法中先执行discoveryClient.refreshInstanceInfo();刷新实例状态,
                         //然后调用discoveryClient.register()注册,如果在DiscoveryClient初始化的时候,
                      instanceInfoReplicator.onDemandUpdate();
                  }
              };
              //通过applicationInfoManager注册监听器
              if (clientConfig.shouldOnDemandUpdateStatusChange()) {
         
         
                  applicationInfoManager.registerStatusChangeListener(statusChangeListener);
              }
    
              //启动InstanceInfo复制器,传入 复制实例的时间间隔
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
          } else {
         
         
              logger.info("Not registering with Eureka server per configuration");
          }
      }
    

    总结一下initScheduledTasks做了如下事情

  • 启动线程调用CacheRefreshThread任务刷新服务列表(registryFetchIntervalSeconds=30s执行一次),从EurekaServer拉取服务注册列表同时刷新客户端缓存
  • 启动heartbeat心跳定时线程(renewalIntervalInSecs=30s续约一次)执行任务HeartbeatThread,调用DiscoverClient.renew定时向Eureka Server发送心跳

  • 启动InstanceInfo复制器定时线程,开启定时线程检查当前检查DataCenterInfo,LeaseInfo,InstanceStatus状态,内部通过isInstanceInfoDirty属性来标记是否有状态改变,如果发现变更就执行discoveryClient.register()将实例信息同步到Server端实现服务注册

  • 通过ApplicationInfoManager注册了StatusChangeListener来监听服务的注册状态改变,当服务状态改变会调用ApplicationInfoManager.setInstanceStatus方法设置服务状态会触发StatusChangeListener监听器,该监听器会调用instanceInfoReplicator.onDemandUpdate();进行按需更新,内部会检查服务变更信息,如果有变更,然后把服务信息注册到Eureaka

需要补充说明的是:在Eureka启动时会自动注册,在EurekaServiceRegistry.register方法会注册EurekaRegistration,方法中会初始化EurekaClient(DiscoveryClient创建)从而注册ApplicationInfoManager中的StatusChangeListener,初始化完成之后会接着执行reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());修改ApplicationInfoManager中的InstatceStatus状态为UPApplicationInfoManagerEurekaClientAutoConfiguration中被创建的时候,里面的InstanceInfo是通过new InstanceInfoFactory().create(config);创建,其中的InstanceStatus默认是STARING,这个状态的改变会触发StatusChangeListener监听器的执行从而触发StatusChangeListener

3.InstanceInfoReplicator复制InstanceInfo注册服务

在initScheduledTasks方法中通过InstanceInfoReplicator复制服务实例,然后调用DiscoverClient.register进行注册

class InstanceInfoReplicator implements Runnable {
   
   
...省略...
//在DiscoverClient初始化的时候调用 延迟40s执行
 public void start(int initialDelayMs) {
   
   
        if (started.compareAndSet(false, true)) {
   
   
            //标记实例有更新(脏数据),这个标记是服务注册的依据
            instanceInfo.setIsDirty();  // for initial register
            //定时调度,30s/次
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);    
            scheduledPeriodicRef.set(next);
        }
    }
    ...省略...
//按需更新,会定时触发服务状态检查和服务注册,见Run方法
public boolean onDemandUpdate() {
   
   
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
   
   
            //定时执行
            if (!scheduler.isShutdown()) {
   
   
                scheduler.submit(new Runnable() {
   
   
                    @Override
                    public void run() {
   
   
                        logger.debug("Executing on-demand update of local InstanceInfo");

                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
   
   
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }

                        //调用run方法
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
   
   
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
   
   
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

    public void run() {
   
   
        try {
   
   
            //刷新InstanceInfo,内部会检查:dataCenter和 LeaseInfo
            //applicationInfoManager.refreshDataCenterInfoIfRequired();
            //applicationInfoManager.refreshLeaseInfoIfRequired();
            //根据HealthCheckHandler获取实例状态InstanceStatus,并更新,如果状态发生变化会触发所有StatusChangeListener
            discoveryClient.refreshInstanceInfo();
            //刷新后,如果实例发生了变更(有脏数据),就发起注册
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
   
   
                //发起注册
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
   
   
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
   
   
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

InstanceInfoReplicator的run方法会在2个地方被调用,一是会在DiscoverClient初始化定时任务initScheduledTasks方法中被创建调用instanceInfoReplicator.start 延迟40s执行,而是调用ApplicationInfoManager.setInstanceStatus改变了实例状态,触发StatusChangeListener监听器,在监听器内部会触发InstanceInfoReplicator.onDemandUpdate方法,然后调用run方法。

4.DiscoveryClient.register服务注册

通过上面的源码跟踪我们知道,Eureka通过DiscoverClient发起服务注册

 /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
   
   
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        //http响应
        EurekaHttpResponse<Void> httpResponse;
        try {
   
   
        //提交注册,发起http请求,eurekaTransport是在DiscoveryClient初始化的时候创建的和EurekaServer交互的客户端,这里是把InstanceInfo作为注册实例提交到EurekaServer
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
   
   
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
   
   
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

这里通过调用:eurekaTransport.registrationClient.register(instanceInfo);得到一个EurekaHttpClient,然后调用register方法向EurekaServer发起http请求实现注册,eurekaTransport是在DiscoveryClient初始化的时候创建的和EurekaServer交互的客户端,这里是把InstanceInfo作为注册实例提交到EurekaServer,继续跟踪下去

public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {
   
   
    @Override
    public EurekaHttpResponse<Void> register(final InstanceInfo info) {
   
   
        return execute(new RequestExecutor<Void>() {
   
   
            @Override
            public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
   
   
                //发起注册请求
                return delegate.register(info);
            }

            @Override
            public RequestType getRequestType() {
   
   
                return RequestType.Register;
            }
        });
    }

代码执行到EurekaHttpClient的装饰类EurekaHttpClientDecorator,先后会执行RetryableEurekaHttpClient(Http请求失败进行重试),
RedirectingEurekaHttpClient(重定向到不同的EurekaServer)
MetricsCollectingEurekaHttpClient(统计执行指标)
AbstractJerseyEurekaHttpClient(使用jersey发起注册)
继续跟踪,会调用AbstractJerseyEurekaHttpClient的register方法注册

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
   
   
@Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
   
   
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
   
   
        //jerseyClient其实是ApacheHttpClient
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
   
   
            if (logger.isDebugEnabled()) {
   
   
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
   
   
                response.close();
            }
        }
    }

看到这里我们应该就明白了,在register方法中获取到了 serviceUrl 即配置文件中的注册服务地址,把InstanceInfo作为参数,底层通过jersey(rest)来发请求然后会调用WebResource发送post请求,实现服务注册。

public class WebResource extends Filterable implements RequestBuilder<WebResource.Builder>, UniformInterface {
   
   
        //发送post请求
         public <T> T post(Class<T> c, Object requestEntity) throws UniformInterfaceException, ClientHandlerException {
   
   
            return WebResource.this.handle(c, this.build("POST", requestEntity));
        }
        ....省略....

5.服务注册流程总结

在这里插入图片描述

1.程序启动触发EurekaServiceRegistry.register(EurekaRegistration)实现服务自动注册

  • 该方法中会初始化DiscoveryClient,
  • 同时改变ApplicationInfoManager中的InstanceStatus的状态为UP,该状态默认是STARING,该状态的变更会触发StatusChangeListener监听器
  • 给EurekaRegistration.eurekaClient注册了监控检查处理器healthCheckHandler

2.DiscoveryClient初始化流程(构造器)中

  • 创建了心跳线程执行器,服务列表刷新线程执行器,
  • 创建了EurekaTransport(EurekaHttpClient)用来向EureakServer发请求的客户端,
  • 然后调用initScheduledTask初始化定时任务

    3.DiscoveryClient.initScheduledTasks方法中

  • 初始化了刷新注册表的定时任务(CacheRefreshThread),

    • 服务心跳心跳定时任务(HeartbeatThread),
    • 创建了InstanceInfoReplicator实例信息复制器,
    • 还注册了ApplicationInfoManager.StatusChangeListener
    • 最后启动InstanceInfoReplicator复制器

    4.InstanceInfoReplicator复制InstanceInfo注册服务

  • 首先它是一个Runnable 线程,30s定时调度一次,除了在DiscoveryClient.initScheduledTasks中会被触发调用,会被StatusChangeListener监听器中被调用InstanceInfoReplicator.onDemandUpdate(),

  • 在InstanceInfoReplicator的run方法中会调用discoveryClient.refreshInstanceInfo();先检查实例信息是否有变更,如果有变更(通过instanceInfo.setIsDirty()标记)会向Eureak注册,调用discoveryClient.register();进行注册

4.DiscoveryClient.register服务注册

  • 使用的是eurekaTransport获取一个EurekaHttpClient,调用register方法
  • 最终调用AbstractJerseyEurekaHttpClient中的register,发起http把InstanceInfo注册到EureakServer

下一章节推荐《Eureka Server服务注册流程

相关文章
|
20天前
springCloud之服务降级熔断Hystrix、OpenFeign
springCloud之服务降级熔断Hystrix、OpenFeign
16 0
|
6天前
|
Java API 开发工具
Spring Boot与Spring Cloud Config的集成
Spring Boot与Spring Cloud Config的集成
|
13天前
|
负载均衡 安全 Java
Spring Cloud中的服务发现与注册
Spring Cloud中的服务发现与注册
|
13天前
|
监控 Java 开发者
Spring Cloud中的服务熔断与降级
Spring Cloud中的服务熔断与降级
|
18天前
|
Java API 数据格式
Spring三兄弟:Spring、Spring Boot、Spring Cloud的100个常用注解大盘点
Spring三兄弟:Spring、Spring Boot、Spring Cloud的100个常用注解大盘点
|
7天前
|
负载均衡 Java 微服务
深入理解Spring Cloud中的服务发现与注册
深入理解Spring Cloud中的服务发现与注册
|
12天前
|
Java API 网络架构
Spring Boot与Spring Cloud Gateway的集成
Spring Boot与Spring Cloud Gateway的集成
|
14天前
|
负载均衡 监控 Java
深入理解Spring Boot与Spring Cloud的整合方式
深入理解Spring Boot与Spring Cloud的整合方式
|
17天前
|
Java Maven 微服务
Spring Cloud Netflix 之 Eureka
Spring Cloud Netflix Eureka是服务发现组件,由Netflix开发,Spring Cloud集成为微服务治理工具。Eureka采用客户端/服务器架构,包含Eureka Server(服务注册中心)和Eureka Client(服务提供者和服务消费者)。服务提供者注册到Eureka Server,服务消费者通过它查找并调用服务。
|
17天前
|
JSON Java Spring
实战SpringCloud响应式微服务系列教程(第八章)构建响应式RESTful服务
实战SpringCloud响应式微服务系列教程(第八章)构建响应式RESTful服务