Spring Cloud Eureka 源码解析(下)

简介: 本文主要是从 Eureka 源码的角度分析 Eureka 的实现原理和业务细节流程, 在本文的开头也给出了集群模式服务端的配置以及客户端的配置 demo.

Eureka Client 源码


Eureka Client  服务更新访问的接口信息


  1. 服务初始化注册
  2. 服务发送心跳信息
  3. 服务列表拉取,全量拉取
  4. 服务列表拉取,增量拉取


一.  初始化过程


// DiscoveryClient
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
  if (args != null) {
    this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
    this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
    this.eventListeners.addAll(args.getEventListeners());
    this.preRegistrationHandler = args.preRegistrationHandler;
  } else {
    this.healthCheckCallbackProvider = null;
    this.healthCheckHandlerProvider = null;
    this.preRegistrationHandler = null;
  }
  this.applicationInfoManager = applicationInfoManager;
  InstanceInfo myInfo = applicationInfoManager.getInfo();
  clientConfig = config;
  staticClientConfig = clientConfig;
  transportConfig = config.getTransportConfig();
  instanceInfo = myInfo;
  if (myInfo != null) {
    appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
  } else {
    logger.warn("Setting instanceInfo to a passed in null value");
  }
  this.backupRegistryProvider = backupRegistryProvider;
  this.endpointRandomizer = endpointRandomizer;
  this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
  localRegionApps.set(new Applications());
  fetchRegistryGeneration = new AtomicLong(0);
  remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
  remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
  if (config.shouldFetchRegistry()) {
    this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
  } else {
    this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  }
  if (config.shouldRegisterWithEureka()) {
    this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
  } else {
    this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  }
  logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
  if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
    logger.info("Client configured to neither register nor query for data.");
    scheduler = null;
    heartbeatExecutor = null;
    cacheRefreshExecutor = null;
    eurekaTransport = null;
    instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);
    initTimestampMs = System.currentTimeMillis();
    initRegistrySize = this.getApplications().size();
    registrySize = initRegistrySize;
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);
    return;  // no need to setup up an network tasks and we are done
  }
  try {
    // default size of 2 - 1 each for heartbeat and cacheRefresh
    // 定时任务的调度类
    scheduler = Executors.newScheduledThreadPool(2,
                                                 new ThreadFactoryBuilder()
                                                 .setNameFormat("DiscoveryClient-%d")
                                                 .setDaemon(true)
                                                 .build());
    // 心跳的线程池
    heartbeatExecutor = new ThreadPoolExecutor(
      1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(),
      new ThreadFactoryBuilder()
      .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
      .setDaemon(true)
      .build()
    );  // use direct handoff
    // 配置刷新的线程池
    cacheRefreshExecutor = new ThreadPoolExecutor(
      1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(),
      new ThreadFactoryBuilder()
      .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
      .setDaemon(true)
      .build()
    );  // use direct handoff
    eurekaTransport = new EurekaTransport();
    scheduleServerEndpointTask(eurekaTransport, args);
    AzToRegionMapper azToRegionMapper;
    if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
      azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
    } else {
      azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
    }
    if (null != remoteRegionsToFetch.get()) {
      azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
    }
    instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
  } catch (Throwable e) {
    throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
  }
  // shouldFetchRegistry 关闭服务发现,不去拉取其它服务列表,自己只是作为服务提供者
  if (clientConfig.shouldFetchRegistry()) {
    try {
      // 服务发现
      // 1. 先去 eureka 上面拿信息
      // 2. 然后去 eureka 上注册
      boolean primaryFetchRegistryResult = fetchRegistry(false);
      if (!primaryFetchRegistryResult) {
        logger.info("Initial registry fetch from primary servers failed");
      }
      boolean backupFetchRegistryResult = true;
      // 服务端拿不到注册信息,就去备用服务器拿注册信息
      if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
        backupFetchRegistryResult = false;
        logger.info("Initial registry fetch from backup servers failed");
      }
      if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
        throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
      }
    } catch (Throwable th) {
      logger.error("Fetch registry error at startup: {}", th.getMessage());
      throw new IllegalStateException(th);
    }
  }
  // call and execute the pre registration handler before all background tasks (inc registration) is started
  if (this.preRegistrationHandler != null) {
    this.preRegistrationHandler.beforeRegistration();
  }
  // shouldRegisterWithEureka 自己作为一个消费者,自己只是一个消费者,不去做服务注册
  if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
    try {
      // 服务注册
      if (!register() ) {
        throw new IllegalStateException("Registration error at startup. Invalid server response.");
      }
    } catch (Throwable th) {
      logger.error("Registration error at startup: {}", th.getMessage());
      throw new IllegalStateException(th);
    }
  }
  // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
  initScheduledTasks();
  try {
    Monitors.registerObject(this);
  } catch (Throwable e) {
    logger.warn("Cannot register timers", e);
  }
  // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
  // to work with DI'd DiscoveryClient
  DiscoveryManager.getInstance().setDiscoveryClient(this);
  DiscoveryManager.getInstance().setEurekaClientConfig(config);
  initTimestampMs = System.currentTimeMillis();
  initRegistrySize = this.getApplications().size();
  registrySize = initRegistrySize;
  logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
              initTimestampMs, initRegistrySize);
}


服务注册


// 服务注册
// DiscoveryClient
boolean register() throws Throwable {
  logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
  EurekaHttpResponse<Void> httpResponse;
  try {
    httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  } catch (Exception e) {
    logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
    throw e;
  }
  if (logger.isInfoEnabled()) {
    logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
  }
  return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
// AbstractJerseyEurekaHttpClient
// 本质就是一个 Jersey 对 Eureka Server 注册接口调用 
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
  String urlPath = "apps/" + info.getAppName();
  ClientResponse response = null;
  try {
    Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
    addExtraHeaders(resourceBuilder);
    response = resourceBuilder
      .header("Accept-Encoding", "gzip")
      .type(MediaType.APPLICATION_JSON_TYPE)
      .accept(MediaType.APPLICATION_JSON)
      .post(ClientResponse.class, info); // info 客户端的数据
    return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
  } finally {
    if (logger.isDebugEnabled()) {
      logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                   response == null ? "N/A" : response.getStatus());
    }
    if (response != null) {
      response.close();
    }
  }
}
// 服务端就是在  ApplicationResource#addInstance 处理


二. 心跳连接


// DiscoveryClient
boolean renew() {
  EurekaHttpResponse<InstanceInfo> httpResponse;
  try {
    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    // 判断是否返回的是 NOT_FOUND 404
    if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
      REREGISTER_COUNTER.increment();
      logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
      long timestamp = instanceInfo.setIsDirtyWithTime();
      boolean success = register();
      if (success) {
        instanceInfo.unsetIsDirty(timestamp);
      }
      return success;
    }
    return httpResponse.getStatusCode() == Status.OK.getStatusCode();
  } catch (Throwable e) {
    logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
    return false;
  }
}


三. 服务发现


  1. 是否全量拉取, 拉取 eureka 所有的注册信息。


  1. 是否增量拉取, 拉取 eureka 最近三分钟数据;是否有更新的信息,如果有更新


private boolean fetchRegistry(boolean forceFullRegistryFetch) {
  Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
  try {
    // If the delta is disabled or if it is the first time, get all
    // applications
    // 获取客户端缓存信息
    Applications applications = getApplications();
    if (clientConfig.shouldDisableDelta()  // 配置只拉取全量
        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) // VIP 地址, 当前 eureka client 是否对单一的注册地址感兴趣
        || forceFullRegistryFetch // 强制全量拉取
        || (applications == null) // 初始化的时候是全量
        || (applications.getRegisteredApplications().size() == 0)
        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    {
      logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
      logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
      logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
      logger.info("Application is null : {}", (applications == null));
      logger.info("Registered Applications size is zero : {}",
                  (applications.getRegisteredApplications().size() == 0));
      logger.info("Application version is -1: {}", (applications.getVersion() == -1));
      getAndStoreFullRegistry();       // 全量拉取
    } else {
      getAndUpdateDelta(applications); // 增量拉取
    }
    applications.setAppsHashCode(applications.getReconcileHashCode());
    logTotalInstances();
  } catch (Throwable e) {
    logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
    return false;
  } finally {
    if (tracer != null) {
      tracer.stop();
    }
  }
  // Notify about cache refresh before updating the instance remote status
  onCacheRefreshed();
  // Update remote status based on refreshed data held in the cache
  updateInstanceRemoteStatus();
  // registry was fetched successfully, so return true
  return true;
}


增量服务拉取, 防止并发这里有一个加锁的操作。


// 服务增量拉取
private void getAndUpdateDelta(Applications applications) throws Throwable {
  long currentUpdateGeneration = fetchRegistryGeneration.get();
  Applications delta = null;
  EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
  if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
    delta = httpResponse.getEntity();
  }
  if (delta == null) {
    logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
    getAndStoreFullRegistry();
  } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
    logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
    String reconcileHashCode = "";
    if (fetchRegistryUpdateLock.tryLock()) {
      try {
        updateDelta(delta);
        reconcileHashCode = getReconcileHashCode(applications);
      } finally {
        fetchRegistryUpdateLock.unlock();
      }
    } else {
      logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
    }
    // There is a diff in number of instances for some reason
    if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
      reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
    }
  } else {
    logger.warn("Not updating application delta as another thread is updating it already");
    logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
  }
}


服务端 recentlyChangedQueue 清空 3 分钟内被修改的数据

30 秒执行一次定时任务, 定时任务里面清空 3 分钟没有更新的微服务实例

让我们的增量 , 与全量数据的 hashCode

client:  本地的数 hashCode + 增量的 hashCode

与服务端传过来的 hashCode  判断是否相同


Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());


四. 服务下架


  1. 手动下架,注入 DiscoveryClient 然后调用 shutdown 方法


// DiscoveryClient
public synchronized void shutdown() {
  if (isShutdown.compareAndSet(false, true)) {
    logger.info("Shutting down DiscoveryClient ...");
    if (statusChangeListener != null && applicationInfoManager != null) {
      applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
    }
    cancelScheduledTasks();
    // If APPINFO was registered
    if (applicationInfoManager != null
        && clientConfig.shouldRegisterWithEureka()
        && clientConfig.shouldUnregisterOnShutdown()) {
      applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
      unregister();
    }
    if (eurekaTransport != null) {
      eurekaTransport.shutdown();
    }
    heartbeatStalenessMonitor.shutdown();
    registryStalenessMonitor.shutdown();
    Monitors.unregisterObject(this);
    logger.info("Completed shut down of DiscoveryClient");
  }
}


Eureka 总结


Eureka 是一个比较优秀的服务注册中心, 实现了 AP 主要是保证可用性和分区容错性。


  1. Eureka Server 的所有的节点配置信息是存储在内存中的,查询服务服务注册信息使用了多层缓存。


  • 多层缓存:首先会进行一级缓存 **readOnlyCacheMap**读取,然后读取二级缓存**readWriteCacheMap**,  最后读取真实数据
  • 缓存过期,在接收到 register , renew  cancel 请求后会失效二级缓存;服务剔除会删除二级缓存;二级缓存本身过期。
  • 缓存更新,一级缓存中查询不到的时候,会从二级缓存中查询, 如果二级缓存中存在,一级缓存中不存在会同步到一级缓存中。定时任务 3分钟也会主动同步一次一级缓存。


  1. Eureka Server 集群环境中,注册的服务只会向一台服务发起注册,然后当前服务端节点会遍历其它节点进行注册信息同步。


  1. Eureka Client 默认会 30 秒中发送一次心跳来进行续约,告知 Eureka Server  客户端依然存活没有问题,如果 Eureka Server 90 秒钟没有收到客户端的续约,它会将实例从注册表中删除。


  1. Eureka Server自我保护机制,当大量的服务过期的时候,存活服务低于 85% 的时候,就会启动自我保护机制,每次只会下线 15%的服务。


相关文章
|
6月前
|
存储 Java 文件存储
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录—— logback.xml 配置文件解析
本文解析了 `logback.xml` 配置文件的详细内容,包括日志输出格式、存储路径、控制台输出及日志级别等关键配置。通过定义 `LOG_PATTERN` 和 `FILE_PATH`,设置日志格式与存储路径;利用 `&lt;appender&gt;` 节点配置控制台和文件输出,支持日志滚动策略(如文件大小限制和保存时长);最后通过 `&lt;logger&gt;` 和 `&lt;root&gt;` 定义日志级别与输出方式。此配置适用于精细化管理日志输出,满足不同场景需求。
1320 1
|
8月前
|
XML Java 开发者
Spring底层架构核心概念解析
理解 Spring 框架的核心概念对于开发和维护 Spring 应用程序至关重要。IOC 和 AOP 是其两个关键特性,通过依赖注入和面向切面编程实现了高效的模块化和松耦合设计。Spring 容器管理着 Beans 的生命周期和配置,而核心模块为各种应用场景提供了丰富的功能支持。通过全面掌握这些核心概念,开发者可以更加高效地利用 Spring 框架开发企业级应用。
223 18
|
6月前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
105 0
|
8月前
|
存储 弹性计算 安全
Cloud Backup深度解析:从被动防御到主动保护
《Cloud Backup深度解析:从被动防御到主动保护》由阿里云高级技术专家张磊分享,探讨企业数据保护面临的挑战及应对策略。内容涵盖企业数据安全威胁、小概率事件的高风险性、传统备份系统的不足,以及通过四步主动防御策略(资源发现、风险检测、数据锁定、全局巡检)实现高效的数据保护。同时介绍了基于标签的自动策略关联、多种备份引擎、恶意文件检测、探测效率优化等关键技术,确保备份数据的安全性和完整性。此外,还展示了数据灾备中心和全方位主动数据保护机制,帮助企业在面对勒索病毒、内部攻击等威胁时,构建更强大的防护体系。
|
9月前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
8月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
283 7
|
7月前
|
传感器 监控 安全
智慧工地云平台的技术架构解析:微服务+Spring Cloud如何支撑海量数据?
慧工地解决方案依托AI、物联网和BIM技术,实现对施工现场的全方位、立体化管理。通过规范施工、减少安全隐患、节省人力、降低运营成本,提升工地管理的安全性、效率和精益度。该方案适用于大型建筑、基础设施、房地产开发等场景,具备微服务架构、大数据与AI分析、物联网设备联网、多端协同等创新点,推动建筑行业向数字化、智能化转型。未来将融合5G、区块链等技术,助力智慧城市建设。
311 0
|
10月前
|
前端开发 Java 开发者
Spring MVC中的请求映射:@RequestMapping注解深度解析
在Spring MVC框架中,`@RequestMapping`注解是实现请求映射的关键,它将HTTP请求映射到相应的处理器方法上。本文将深入探讨`@RequestMapping`注解的工作原理、使用方法以及最佳实践,为开发者提供一份详尽的技术干货。
838 2
|
10月前
|
前端开发 Java Spring
探索Spring MVC:@Controller注解的全面解析
在Spring MVC框架中,`@Controller`注解是构建Web应用程序的基石之一。它不仅简化了控制器的定义,还提供了一种优雅的方式来处理HTTP请求。本文将全面解析`@Controller`注解,包括其定义、用法、以及在Spring MVC中的作用。
229 2
|
10月前
|
前端开发 Java Maven
深入解析:如何用 Spring Boot 实现分页和排序
深入解析:如何用 Spring Boot 实现分页和排序
674 2

推荐镜像

更多
  • DNS