Spring Cloud Eureka 源码解析(中)

简介: 本文主要是从 Eureka 源码的角度分析 Eureka 的实现原理和业务细节流程, 在本文的开头也给出了集群模式服务端的配置以及客户端的配置 demo.

四. 集群原理


  1. 集群同步由 PeerAwareInstanceRegistryImpl 来负责处理。


  1. 如果是一个 client node 注册到 server  node 那么接收到这个 server  node 会把 client node 的请求 转发 到其它的 server node


// 集群同步
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
  Stopwatch tracer = action.getTimer().start();
  try {
    if (isReplication) {
      numberOfReplicationsLastMin.increment();
    }
    // If it is a replication already, do not replicate again as this will create a poison replication
    // 所有集群节点(peerEurekaNodes)为空或者是一个集群操作(isReplication),防止死循环注册
    if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
      return;
    }
    // 同步到所有节点
    for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
      // If the url represents this host, do not replicate to yourself.
      if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { // 排除当前节点
        continue;
      }
      replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
    }
  } finally {
    tracer.stop();
  }
}


  1. 集群同步事件处理 replicateInstanceActionsToPeers


// 集群同步事件处理
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
  try {
    InstanceInfo infoFromRegistry;
    CurrentRequestVersion.set(Version.V2);
    switch (action) {
      case Cancel:    // 取消
        node.cancel(appName, id);
        break;
      case Heartbeat: // 心跳
        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
        break;
      case Register:  // 注册
        node.register(info);
        break;
      case StatusUpdate:
        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
        break;
      case DeleteStatusOverride:
        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
        node.deleteStatusOverride(appName, id, infoFromRegistry);
        break;
    }
  } catch (Throwable t) {
    logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
  } finally {
    CurrentRequestVersion.remove();
  }
}


  1. 集群启动同步


public int syncUp() {
  // Copy entire entry from neighboring DS node
  int count = 0;
  for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
    if (i > 0) {
      try {
        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
      } catch (InterruptedException e) {
        logger.warn("Interrupted during registry transfer..");
        break;
      }
    }
    Applications apps = eurekaClient.getApplications();
    for (Application app : apps.getRegisteredApplications()) {
      for (InstanceInfo instance : app.getInstances()) {
        try {
          if (isRegisterable(instance)) {
            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
            count++;
          }
        } catch (Throwable t) {
          logger.error("During DS init copy", t);
        }
      }
    }
  }
  return count;
}


五. 自我保护机制


  1. 是否启动自我保护机制


// 配置参数 isSelfPreservationModeEnabled
public boolean isSelfPreservationModeEnabled() {
  return serverConfig.shouldEnableSelfPreservation();
}


  1. 触发自我保护机制的配置,  触发的条件,短时间内,大量的心跳连接过期。就是大量宕机。eureka 就会触发自我保护机制。大量的阈值是 85%。


// 触发自我保护机制的配置阈值 numberOfRenewsPerMinThreshold
public boolean isLeaseExpirationEnabled() {
  if (!isSelfPreservationModeEnabled()) {
    // The self preservation mode is disabled, hence allowing the instances to expire.
    return true;
  }
  return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}


  1. 服务注册的执行的自我保护机制


// The lease does not exist and hence it is a new registration
synchronized (lock) {
  if (this.expectedNumberOfClientsSendingRenews > 0) {
    // Since the client wants to register it, increase the number of clients sending renews
    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
    updateRenewsPerMinThreshold();
  }
}


更改自我保护的阈值 updateRenewsPerMinThreshold


// getExpectedClientRenewalIntervalSeconds 默认每分钟发送心跳字数默认两次
// 计算公式: 预估心跳值(所有注册上来的实例) * 每分钟触发心跳连接次数(60s / 服务端每分钟刷新时间默认 30s) * 自我保护机制触发的百分比(默认 85%)
// 就是 15% 没有连接上,就触发自我保护机制
protected void updateRenewsPerMinThreshold() {
  this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                                              * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                                              * serverConfig.getRenewalPercentThreshold());
}


自我保护机制的定时更改


private void scheduleRenewalThresholdUpdateTask() {
  timer.schedule(new TimerTask() {
    @Override
    public void run() {
      updateRenewalThreshold();
    }
  }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                 serverConfig.getRenewalThresholdUpdateIntervalMs());
}
复制代码


总结:如果想要自我保护机制正常工作,需要客户端的每分钟心跳次数与服务端的配置相同


六.  缓存机制


Eureka 3 层缓存


  1. 只读缓存: ConcurrentHashMap
  2. 读写缓存: guava
  3. 真实数据: ConcurrentHashMap


目的:如果直接操作真实数据,这样就可以减少锁的抢占,带来效率上的提升;会出现的问题就是数据不一致,当时不能带来强一致性 95%


  1. 服务发现,获取服务信息的获取


@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {
  boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
  String[] regions = null;
  if (!isRemoteRegionRequested) {
    EurekaMonitors.GET_ALL.increment();
  } else {
    regions = regionsStr.toLowerCase().split(",");
    Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
    EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
  }
  // Check if the server allows the access to the registry. The server can
  // restrict access if it is not
  // ready to serve traffic depending on various reasons.
  if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
    return Response.status(Status.FORBIDDEN).build();
  }
  CurrentRequestVersion.set(Version.toEnum(version));
  KeyType keyType = Key.KeyType.JSON;
  String returnMediaType = MediaType.APPLICATION_JSON;
  if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
    keyType = Key.KeyType.XML;
    returnMediaType = MediaType.APPLICATION_XML;
  }
  Key cacheKey = new Key(Key.EntityType.Application,
                         ResponseCacheImpl.ALL_APPS,
                         keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
                        );
  Response response;
  if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    // responseCache 缓存对象
    response = Response.ok(responseCache.getGZIP(cacheKey))
      .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
      .header(HEADER_CONTENT_TYPE, returnMediaType)
      .build();
  } else {
    response = Response.ok(responseCache.get(cacheKey))
      .build();
  }
  CurrentRequestVersion.remove();
  return response;
}


最终查询调用 getValue 方法


// ResponseCacheImpl
Value getValue(final Key key, boolean useReadOnlyCache) {
  Value payload = null;
  try {
    if (useReadOnlyCache) {
      // 1. 只读缓存
      final Value currentPayload = readOnlyCacheMap.get(key);
      if (currentPayload != null) {
        payload = currentPayload;
      } else {
        // 2. 读写缓存 guava
        payload = readWriteCacheMap.get(key);
        readOnlyCacheMap.put(key, payload);
      }
    } else {
      payload = readWriteCacheMap.get(key);
    }
  } catch (Throwable t) {
    logger.error("Cannot get value for key : {}", key, t);
  }
  return payload;
}


查询真实数据


private Value generatePayload(Key key) {
  Stopwatch tracer = null;
  try {
    String payload;
    switch (key.getEntityType()) {
      case Application:
        boolean isRemoteRegionRequested = key.hasRegions();
        if (ALL_APPS.equals(key.getName())) {
          if (isRemoteRegionRequested) {
            tracer = serializeAllAppsWithRemoteRegionTimer.start();
            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
          } else {
            tracer = serializeAllAppsTimer.start();
            payload = getPayLoad(key, registry.getApplications());
          }
        } else if (ALL_APPS_DELTA.equals(key.getName())) {
          if (isRemoteRegionRequested) {
            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
            versionDeltaWithRegions.incrementAndGet();
            versionDeltaWithRegionsLegacy.incrementAndGet();
            payload = getPayLoad(key,
                                 registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
          } else {
            tracer = serializeDeltaAppsTimer.start();
            versionDelta.incrementAndGet();
            versionDeltaLegacy.incrementAndGet();
            payload = getPayLoad(key, registry.getApplicationDeltas());
          }
        } else {
          tracer = serializeOneApptimer.start();
          payload = getPayLoad(key, registry.getApplication(key.getName()));
        }
        break;
      case VIP:
      case SVIP:
        tracer = serializeViptimer.start();
        payload = getPayLoad(key, getApplicationsForVip(key, registry));
        break;
      default:
        logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
        payload = "";
        break;
    }
    return new Value(payload);
  } finally {
    if (tracer != null) {
      tracer.stop();
    }
  }
}


缓存读取总结:


  1. 首先进入只读缓存;
  2. 如果只读缓存没有的话就进入读写缓存;
  3. 如果读写缓存也没有, 就执行监听器的逻辑,从真实数据里面拿。

缓存在什么时候会发生更改?


  1. 只读缓存修改的地点,  只读缓存只能通过定时任务,每 30 秒进行一次同步更新。
  2. 如果只读缓存找不到,但是读写缓存可以查询到,就更新到只读缓存。
  3. 延迟的统计:30秒计时器延迟 + 客户端缓存延迟 30s + ribbon (1s) = 61秒


// ResponseCacheImpl
Value getValue(final Key key, boolean useReadOnlyCache) {
  Value payload = null;
  try {
    if (useReadOnlyCache) {
      final Value currentPayload = readOnlyCacheMap.get(key);
      if (currentPayload != null) {
        payload = currentPayload;
      } else {
        // 如果只读缓存找不到,但是读写缓存可以查询到的时候会更新。
        payload = readWriteCacheMap.get(key);
        readOnlyCacheMap.put(key, payload);
      }
    } else {
      payload = readWriteCacheMap.get(key);
    }
  } catch (Throwable t) {
    logger.error("Cannot get value for key : {}", key, t);
  }
  return payload;
}
// 只读缓存更新,30s 执行一次同步数据
private TimerTask getCacheUpdateTask() {
  return new TimerTask() {
    @Override
    public void run() {
      logger.debug("Updating the client cache from response cache");
      for (Key key : readOnlyCacheMap.keySet()) {
        if (logger.isDebugEnabled()) {
          logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                       key.getEntityType(), key.getName(), key.getVersion(), key.getType());
        }
        try {
          CurrentRequestVersion.set(key.getVersion());
          Value cacheValue = readWriteCacheMap.get(key);
          Value currentCacheValue = readOnlyCacheMap.get(key);
          if (cacheValue != currentCacheValue) {
            readOnlyCacheMap.put(key, cacheValue);
          }
        } catch (Throwable th) {
          logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
        } finally {
          CurrentRequestVersion.remove();
        }
      }
    }
  };
}


当做服务发现的时候会锁住注册(修改、下架)操作,服务发现的时候加的是写锁, 注册(修改、下架)操作时读锁;


这样设计的目的是为了尽可能的保证读的准确性。


相关文章
|
6月前
|
存储 Java 文件存储
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录—— logback.xml 配置文件解析
本文解析了 `logback.xml` 配置文件的详细内容,包括日志输出格式、存储路径、控制台输出及日志级别等关键配置。通过定义 `LOG_PATTERN` 和 `FILE_PATH`,设置日志格式与存储路径;利用 `&lt;appender&gt;` 节点配置控制台和文件输出,支持日志滚动策略(如文件大小限制和保存时长);最后通过 `&lt;logger&gt;` 和 `&lt;root&gt;` 定义日志级别与输出方式。此配置适用于精细化管理日志输出,满足不同场景需求。
1320 1
|
14天前
|
设计模式 Java 开发者
如何快速上手【Spring AOP】?从动态代理到源码剖析(下篇)
Spring AOP的实现本质上依赖于代理模式这一经典设计模式。代理模式通过引入代理对象作为目标对象的中间层,实现了对目标对象访问的控制与增强,其核心价值在于解耦核心业务逻辑与横切关注点。在框架设计中,这种模式广泛用于实现功能扩展(如远程调用、延迟加载)、行为拦截(如权限校验、异常处理)等场景,为系统提供了更高的灵活性和可维护性。
|
5月前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
364 70
|
8月前
|
XML Java 开发者
Spring底层架构核心概念解析
理解 Spring 框架的核心概念对于开发和维护 Spring 应用程序至关重要。IOC 和 AOP 是其两个关键特性,通过依赖注入和面向切面编程实现了高效的模块化和松耦合设计。Spring 容器管理着 Beans 的生命周期和配置,而核心模块为各种应用场景提供了丰富的功能支持。通过全面掌握这些核心概念,开发者可以更加高效地利用 Spring 框架开发企业级应用。
223 18
|
6月前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
105 0
|
9月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
9月前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
8月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
283 7
|
7月前
|
传感器 监控 安全
智慧工地云平台的技术架构解析:微服务+Spring Cloud如何支撑海量数据?
慧工地解决方案依托AI、物联网和BIM技术,实现对施工现场的全方位、立体化管理。通过规范施工、减少安全隐患、节省人力、降低运营成本,提升工地管理的安全性、效率和精益度。该方案适用于大型建筑、基础设施、房地产开发等场景,具备微服务架构、大数据与AI分析、物联网设备联网、多端协同等创新点,推动建筑行业向数字化、智能化转型。未来将融合5G、区块链等技术,助力智慧城市建设。
311 0

推荐镜像

更多
  • DNS