Spring Cloud Eureka 源码解析(中)

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 本文主要是从 Eureka 源码的角度分析 Eureka 的实现原理和业务细节流程, 在本文的开头也给出了集群模式服务端的配置以及客户端的配置 demo.

四. 集群原理


  1. 集群同步由 PeerAwareInstanceRegistryImpl 来负责处理。


  1. 如果是一个 client node 注册到 server  node 那么接收到这个 server  node 会把 client node 的请求 转发 到其它的 server node


// 集群同步
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
  Stopwatch tracer = action.getTimer().start();
  try {
    if (isReplication) {
      numberOfReplicationsLastMin.increment();
    }
    // If it is a replication already, do not replicate again as this will create a poison replication
    // 所有集群节点(peerEurekaNodes)为空或者是一个集群操作(isReplication),防止死循环注册
    if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
      return;
    }
    // 同步到所有节点
    for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
      // If the url represents this host, do not replicate to yourself.
      if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { // 排除当前节点
        continue;
      }
      replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
    }
  } finally {
    tracer.stop();
  }
}


  1. 集群同步事件处理 replicateInstanceActionsToPeers


// 集群同步事件处理
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
  try {
    InstanceInfo infoFromRegistry;
    CurrentRequestVersion.set(Version.V2);
    switch (action) {
      case Cancel:    // 取消
        node.cancel(appName, id);
        break;
      case Heartbeat: // 心跳
        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
        break;
      case Register:  // 注册
        node.register(info);
        break;
      case StatusUpdate:
        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
        break;
      case DeleteStatusOverride:
        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
        node.deleteStatusOverride(appName, id, infoFromRegistry);
        break;
    }
  } catch (Throwable t) {
    logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
  } finally {
    CurrentRequestVersion.remove();
  }
}


  1. 集群启动同步


public int syncUp() {
  // Copy entire entry from neighboring DS node
  int count = 0;
  for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
    if (i > 0) {
      try {
        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
      } catch (InterruptedException e) {
        logger.warn("Interrupted during registry transfer..");
        break;
      }
    }
    Applications apps = eurekaClient.getApplications();
    for (Application app : apps.getRegisteredApplications()) {
      for (InstanceInfo instance : app.getInstances()) {
        try {
          if (isRegisterable(instance)) {
            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
            count++;
          }
        } catch (Throwable t) {
          logger.error("During DS init copy", t);
        }
      }
    }
  }
  return count;
}


五. 自我保护机制


  1. 是否启动自我保护机制


// 配置参数 isSelfPreservationModeEnabled
public boolean isSelfPreservationModeEnabled() {
  return serverConfig.shouldEnableSelfPreservation();
}


  1. 触发自我保护机制的配置,  触发的条件,短时间内,大量的心跳连接过期。就是大量宕机。eureka 就会触发自我保护机制。大量的阈值是 85%。


// 触发自我保护机制的配置阈值 numberOfRenewsPerMinThreshold
public boolean isLeaseExpirationEnabled() {
  if (!isSelfPreservationModeEnabled()) {
    // The self preservation mode is disabled, hence allowing the instances to expire.
    return true;
  }
  return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}


  1. 服务注册的执行的自我保护机制


// The lease does not exist and hence it is a new registration
synchronized (lock) {
  if (this.expectedNumberOfClientsSendingRenews > 0) {
    // Since the client wants to register it, increase the number of clients sending renews
    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
    updateRenewsPerMinThreshold();
  }
}


更改自我保护的阈值 updateRenewsPerMinThreshold


// getExpectedClientRenewalIntervalSeconds 默认每分钟发送心跳字数默认两次
// 计算公式: 预估心跳值(所有注册上来的实例) * 每分钟触发心跳连接次数(60s / 服务端每分钟刷新时间默认 30s) * 自我保护机制触发的百分比(默认 85%)
// 就是 15% 没有连接上,就触发自我保护机制
protected void updateRenewsPerMinThreshold() {
  this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                                              * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                                              * serverConfig.getRenewalPercentThreshold());
}


自我保护机制的定时更改


private void scheduleRenewalThresholdUpdateTask() {
  timer.schedule(new TimerTask() {
    @Override
    public void run() {
      updateRenewalThreshold();
    }
  }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                 serverConfig.getRenewalThresholdUpdateIntervalMs());
}
复制代码


总结:如果想要自我保护机制正常工作,需要客户端的每分钟心跳次数与服务端的配置相同


六.  缓存机制


Eureka 3 层缓存


  1. 只读缓存: ConcurrentHashMap
  2. 读写缓存: guava
  3. 真实数据: ConcurrentHashMap


目的:如果直接操作真实数据,这样就可以减少锁的抢占,带来效率上的提升;会出现的问题就是数据不一致,当时不能带来强一致性 95%


  1. 服务发现,获取服务信息的获取


@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {
  boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
  String[] regions = null;
  if (!isRemoteRegionRequested) {
    EurekaMonitors.GET_ALL.increment();
  } else {
    regions = regionsStr.toLowerCase().split(",");
    Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
    EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
  }
  // Check if the server allows the access to the registry. The server can
  // restrict access if it is not
  // ready to serve traffic depending on various reasons.
  if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
    return Response.status(Status.FORBIDDEN).build();
  }
  CurrentRequestVersion.set(Version.toEnum(version));
  KeyType keyType = Key.KeyType.JSON;
  String returnMediaType = MediaType.APPLICATION_JSON;
  if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
    keyType = Key.KeyType.XML;
    returnMediaType = MediaType.APPLICATION_XML;
  }
  Key cacheKey = new Key(Key.EntityType.Application,
                         ResponseCacheImpl.ALL_APPS,
                         keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
                        );
  Response response;
  if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    // responseCache 缓存对象
    response = Response.ok(responseCache.getGZIP(cacheKey))
      .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
      .header(HEADER_CONTENT_TYPE, returnMediaType)
      .build();
  } else {
    response = Response.ok(responseCache.get(cacheKey))
      .build();
  }
  CurrentRequestVersion.remove();
  return response;
}


最终查询调用 getValue 方法


// ResponseCacheImpl
Value getValue(final Key key, boolean useReadOnlyCache) {
  Value payload = null;
  try {
    if (useReadOnlyCache) {
      // 1. 只读缓存
      final Value currentPayload = readOnlyCacheMap.get(key);
      if (currentPayload != null) {
        payload = currentPayload;
      } else {
        // 2. 读写缓存 guava
        payload = readWriteCacheMap.get(key);
        readOnlyCacheMap.put(key, payload);
      }
    } else {
      payload = readWriteCacheMap.get(key);
    }
  } catch (Throwable t) {
    logger.error("Cannot get value for key : {}", key, t);
  }
  return payload;
}


查询真实数据


private Value generatePayload(Key key) {
  Stopwatch tracer = null;
  try {
    String payload;
    switch (key.getEntityType()) {
      case Application:
        boolean isRemoteRegionRequested = key.hasRegions();
        if (ALL_APPS.equals(key.getName())) {
          if (isRemoteRegionRequested) {
            tracer = serializeAllAppsWithRemoteRegionTimer.start();
            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
          } else {
            tracer = serializeAllAppsTimer.start();
            payload = getPayLoad(key, registry.getApplications());
          }
        } else if (ALL_APPS_DELTA.equals(key.getName())) {
          if (isRemoteRegionRequested) {
            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
            versionDeltaWithRegions.incrementAndGet();
            versionDeltaWithRegionsLegacy.incrementAndGet();
            payload = getPayLoad(key,
                                 registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
          } else {
            tracer = serializeDeltaAppsTimer.start();
            versionDelta.incrementAndGet();
            versionDeltaLegacy.incrementAndGet();
            payload = getPayLoad(key, registry.getApplicationDeltas());
          }
        } else {
          tracer = serializeOneApptimer.start();
          payload = getPayLoad(key, registry.getApplication(key.getName()));
        }
        break;
      case VIP:
      case SVIP:
        tracer = serializeViptimer.start();
        payload = getPayLoad(key, getApplicationsForVip(key, registry));
        break;
      default:
        logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
        payload = "";
        break;
    }
    return new Value(payload);
  } finally {
    if (tracer != null) {
      tracer.stop();
    }
  }
}


缓存读取总结:


  1. 首先进入只读缓存;
  2. 如果只读缓存没有的话就进入读写缓存;
  3. 如果读写缓存也没有, 就执行监听器的逻辑,从真实数据里面拿。

缓存在什么时候会发生更改?


  1. 只读缓存修改的地点,  只读缓存只能通过定时任务,每 30 秒进行一次同步更新。
  2. 如果只读缓存找不到,但是读写缓存可以查询到,就更新到只读缓存。
  3. 延迟的统计:30秒计时器延迟 + 客户端缓存延迟 30s + ribbon (1s) = 61秒


// ResponseCacheImpl
Value getValue(final Key key, boolean useReadOnlyCache) {
  Value payload = null;
  try {
    if (useReadOnlyCache) {
      final Value currentPayload = readOnlyCacheMap.get(key);
      if (currentPayload != null) {
        payload = currentPayload;
      } else {
        // 如果只读缓存找不到,但是读写缓存可以查询到的时候会更新。
        payload = readWriteCacheMap.get(key);
        readOnlyCacheMap.put(key, payload);
      }
    } else {
      payload = readWriteCacheMap.get(key);
    }
  } catch (Throwable t) {
    logger.error("Cannot get value for key : {}", key, t);
  }
  return payload;
}
// 只读缓存更新,30s 执行一次同步数据
private TimerTask getCacheUpdateTask() {
  return new TimerTask() {
    @Override
    public void run() {
      logger.debug("Updating the client cache from response cache");
      for (Key key : readOnlyCacheMap.keySet()) {
        if (logger.isDebugEnabled()) {
          logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                       key.getEntityType(), key.getName(), key.getVersion(), key.getType());
        }
        try {
          CurrentRequestVersion.set(key.getVersion());
          Value cacheValue = readWriteCacheMap.get(key);
          Value currentCacheValue = readOnlyCacheMap.get(key);
          if (cacheValue != currentCacheValue) {
            readOnlyCacheMap.put(key, cacheValue);
          }
        } catch (Throwable th) {
          logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
        } finally {
          CurrentRequestVersion.remove();
        }
      }
    }
  };
}


当做服务发现的时候会锁住注册(修改、下架)操作,服务发现的时候加的是写锁, 注册(修改、下架)操作时读锁;


这样设计的目的是为了尽可能的保证读的准确性。


相关文章
|
18天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
22天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
51 12
|
19天前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
39 2
|
24天前
|
前端开发 Java 开发者
Spring MVC中的请求映射:@RequestMapping注解深度解析
在Spring MVC框架中,`@RequestMapping`注解是实现请求映射的关键,它将HTTP请求映射到相应的处理器方法上。本文将深入探讨`@RequestMapping`注解的工作原理、使用方法以及最佳实践,为开发者提供一份详尽的技术干货。
72 2
|
24天前
|
前端开发 Java Spring
探索Spring MVC:@Controller注解的全面解析
在Spring MVC框架中,`@Controller`注解是构建Web应用程序的基石之一。它不仅简化了控制器的定义,还提供了一种优雅的方式来处理HTTP请求。本文将全面解析`@Controller`注解,包括其定义、用法、以及在Spring MVC中的作用。
40 2
|
25天前
|
前端开发 Java Maven
深入解析:如何用 Spring Boot 实现分页和排序
深入解析:如何用 Spring Boot 实现分页和排序
47 2
|
27天前
|
Java 开发者 Spring
Spring AOP深度解析:探秘动态代理与增强逻辑
Spring框架中的AOP(Aspect-Oriented Programming,面向切面编程)功能为开发者提供了一种强大的工具,用以将横切关注点(如日志、事务管理等)与业务逻辑分离。本文将深入探讨Spring AOP的底层原理,包括动态代理机制和增强逻辑的实现。
39 4
|
24天前
|
前端开发 Java 开发者
Spring MVC中的控制器:@Controller注解全解析
在Spring MVC框架中,`@Controller`注解是构建Web应用程序控制层的核心。它不仅简化了控制器的定义,还提供了灵活的请求映射和处理机制。本文将深入探讨`@Controller`注解的用法、特点以及在实际开发中的应用。
60 0
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
72 2
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
77 0

推荐镜像

更多