Spring Cloud Eureka 源码解析(上)

简介: 本文主要是从 Eureka 源码的角度分析 Eureka 的实现原理和业务细节流程, 在本文的开头也给出了集群模式服务端的配置以及客户端的配置 demo.

Eureka 相关的历程


Sprng Cloud Netflix 部分项目停止更新说明


大致就是说 eureka2.x 不再更新,但是 eureka1.x 进入维护阶段。

spring.io/blog/2019/0…


Spring Cloud Hoxton 发布说明


Hoxton 版本主要特征是支持响应式编程。

spring.io/blog/2019/1…


集群模式配置


版本概述:


spring-boot 2.4.2

spring-cloud 2020.0.1


服务端配置


  1. pom.xml


<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
  </dependency>
</dependencies>  
<dependencyManagement>
  <dependencies>
    <!--spring boot 2.4.2 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <!--spring cloud 2020.0.1 -->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>   


  1. application.yml


server:
  port: 3001
eureka:
  server:
    enable-self-preservation: false # 关闭自我保护机制
    eviction-interval-timer-in-ms: 4000 # 设置间隔(单位:毫秒)
  instance:
    hostname: eureka3000
  client:
    register-with-eureka: false # 不把自己作为一个客户端注册到自己
    fetch-registry: false # 不需要从服务端获取注册信息
    service-url:
      default-zone: http://eureka3001.com:3001/eureka,http://eureka3001.com:3002/eureka,http://eureka3001.com:3003/eureka


  1. 启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class);
    }
}


客户端配置


  1. pom.xml


<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
</dependencies>


  1. yml


server:
  port: 6000
eureka:
  client:
    serviceUrl:
      # 这里要注意是小驼峰如果填写为中划线, 会导致配置失效
      defaultZone: http://127.0.0.1:3000/eureka # eureka 服务端提供的注册地址
  instance:
    instance-id: power-1 #此实例注册到eureka服务端的唯一的实例ID
    prefer-ip-address: true #是否显示IP地址
    leaseRenewalIntervalInSeconds: 10 #eureka客户需要多长时间发送心跳给eureka服务器,表明它仍然活着,默认为30 秒 (与下面配置的单位都是秒)
    leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒
spring:
  application:
    name: servcie-client


  1. 启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class EurekaClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaClientApplication.class);
    }
}


Spring Boot 自定义 starter


Eureka Server 源码


Eureka 不是采用 Spring MVC 作为 Web 通讯框架的采用的是 jersey 作为底层框架,和 Spring MVC 的区别是采用的是 Filter 作为处理请求派发。


一. 服务注册


注册信息的存储,new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); 第一个 string 对应服务名,内层 String 代表实例的id , Lease 租债器, InstanceInfo 表示服务信息


// 服务注册代码
// AbstractInstanceRegistry
// registrant 本次注册请求 传过来的注册信息
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        read.lock();
        try {
            // registry 所有的注册信息存储地址
            // gMap 通过服务名拿到的微服务组
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 已经存在的微服务实例对象
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 当前存在的微服务实例对象的最后操作时间戳
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                // 传过来的注册实例的时间戳
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // 那个时间戳比较靠前就用新的
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    // getHolder() 具体的微服务实例对象
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        // 如果冲突了,自我保护阈值更新
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // Lease 租债器对象
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                // 更新最后正常工作时间
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 服务注册
            gMap.put(registrant.getId(), lease);
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }


二. 服务下架


启动定时器,去判断服务是否过期,判断当前时间是否大于过期时间 lastUpdateTimestamp。


判断公式: 当前系统时间 > lastUpdateTimestamp + 30s (服务过期时间)


  1. 服务剔除


// 遍历所有的服务信息 15分钟一次, 15% 以上需要剔除
public void evict(long additionalLeaseMs) {
  logger.debug("Running the evict task");
  if (!isLeaseExpirationEnabled()) {
    logger.debug("DS: lease expiration is currently disabled.");
    return;
  }
  // We collect first all expired items, to evict them in random order. For large eviction sets,
  // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
  // the impact should be evenly distributed across all applications.
  // 遍历所有的服务信息,判断是否过期放入过期的列表中
  List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
  for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
    Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
    if (leaseMap != null) {
      for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
        Lease<InstanceInfo> lease = leaseEntry.getValue();
        // 判断是否服务过期
        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
          expiredLeases.add(lease);
        }
      }
    }
  }
  // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
  // triggering self-preservation. Without that we would wipe out full registry.
  // 剔除的数量过大的话, 先剔除一部分
  int registrySize = (int) getLocalRegistrySize();
  // serverConfig.getRenewalPercentThreshold() 默认 85%
  int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
  int evictionLimit = registrySize - registrySizeThreshold;
  int toEvict = Math.min(expiredLeases.size(), evictionLimit);
  if (toEvict > 0) {
    logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    // 随机剔除, 类似洗牌算法
    Random random = new Random(System.currentTimeMillis());
    for (int i = 0; i < toEvict; i++) {
      // Pick a random item (Knuth shuffle algorithm)
      int next = i + random.nextInt(expiredLeases.size() - i);
      Collections.swap(expiredLeases, i, next);
      Lease<InstanceInfo> lease = expiredLeases.get(i);
      String appName = lease.getHolder().getAppName();
      String id = lease.getHolder().getId();
      EXPIRED.increment();
      logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
      internalCancel(appName, id, false);
    }
  }
}


三. 心跳连接


更改最后操作时间


// InstanceResource
@PUT
public Response renewLease(
  @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
  @QueryParam("overriddenstatus") String overriddenStatus,
  @QueryParam("status") String status,
  @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
  boolean isFromReplicaNode = "true".equals(isReplication);
  boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
  // Not found in the registry, immediately ask for a register
  if (!isSuccess) {
    logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
    return Response.status(Status.NOT_FOUND).build();
  }
  // Check if we need to sync based on dirty time stamp, the client
  // instance might have changed some value
  Response response;
  if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
    response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
    // Store the overridden status since the validation found out the node that replicates wins
    if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
        && (overriddenStatus != null)
        && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
        && isFromReplicaNode) {
      registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
    }
  } else {
    response = Response.ok().build();
  }
  logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
  return response;
}


相关文章
|
5月前
|
数据采集 人工智能 Java
1天消化完Spring全家桶文档!DevDocs:一键深度解析开发文档,自动发现子URL并建立图谱
DevDocs是一款基于智能爬虫技术的开源工具,支持1-5层深度网站结构解析,能将技术文档处理时间从数周缩短至几小时,并提供Markdown/JSON格式输出与AI工具无缝集成。
189 1
1天消化完Spring全家桶文档!DevDocs:一键深度解析开发文档,自动发现子URL并建立图谱
|
5月前
|
安全 Java API
深入解析 Spring Security 配置中的 CSRF 启用与 requestMatchers 报错问题
本文深入解析了Spring Security配置中CSRF启用与`requestMatchers`报错的常见问题。针对CSRF,指出默认已启用,无需调用`enable()`,只需移除`disable()`即可恢复。对于`requestMatchers`多路径匹配报错,分析了Spring Security 6.x中方法签名的变化,并提供了三种解决方案:分次调用、自定义匹配器及降级使用`antMatchers()`。最后提醒开发者关注版本兼容性,确保升级平稳过渡。
608 2
|
6月前
|
存储 Java 文件存储
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录—— logback.xml 配置文件解析
本文解析了 `logback.xml` 配置文件的详细内容,包括日志输出格式、存储路径、控制台输出及日志级别等关键配置。通过定义 `LOG_PATTERN` 和 `FILE_PATH`,设置日志格式与存储路径;利用 `&lt;appender&gt;` 节点配置控制台和文件输出,支持日志滚动策略(如文件大小限制和保存时长);最后通过 `&lt;logger&gt;` 和 `&lt;root&gt;` 定义日志级别与输出方式。此配置适用于精细化管理日志输出,满足不同场景需求。
1320 1
|
5月前
|
前端开发 安全 Java
Spring Boot 便利店销售系统项目分包设计解析
本文深入解析了基于Spring Boot的便利店销售系统分包设计,通过清晰的分层架构(表现层、业务逻辑层、数据访问层等)和模块化设计,提升了代码的可维护性、复用性和扩展性。具体分包结构包括`controller`、`service`、`repository`、`entity`、`dto`、`config`和`util`等模块,职责分明,便于团队协作与功能迭代。该设计为复杂企业级应用开发提供了实践参考。
196 0
|
3月前
|
Java 数据库连接 API
Java 对象模型现代化实践 基于 Spring Boot 与 MyBatis Plus 的实现方案深度解析
本文介绍了基于Spring Boot与MyBatis-Plus的Java对象模型现代化实践方案。采用Spring Boot 3.1.2作为基础框架,结合MyBatis-Plus 3.5.3.1进行数据访问层实现,使用Lombok简化PO对象,MapStruct处理对象转换。文章详细讲解了数据库设计、PO对象实现、DAO层构建、业务逻辑封装以及DTO/VO转换等核心环节,提供了一个完整的现代化Java对象模型实现案例。通过分层设计和对象转换,实现了业务逻辑与数据访问的解耦,提高了代码的可维护性和扩展性。
133 1
|
2月前
|
缓存 安全 Java
Spring 框架核心原理与实践解析
本文详解 Spring 框架核心知识,包括 IOC(容器管理对象)与 DI(容器注入依赖),以及通过注解(如 @Service、@Autowired)声明 Bean 和注入依赖的方式。阐述了 Bean 的线程安全(默认单例可能有安全问题,需业务避免共享状态或设为 prototype)、作用域(@Scope 注解,常用 singleton、prototype 等)及完整生命周期(实例化、依赖注入、初始化、销毁等步骤)。 解析了循环依赖的解决机制(三级缓存)、AOP 的概念(公共逻辑抽为切面)、底层动态代理(JDK 与 Cglib 的区别)及项目应用(如日志记录)。介绍了事务的实现(基于 AOP
|
2月前
|
SQL Java 数据库连接
Spring、SpringMVC 与 MyBatis 核心知识点解析
我梳理的这些内容,涵盖了 Spring、SpringMVC 和 MyBatis 的核心知识点。 在 Spring 中,我了解到 IOC 是控制反转,把对象控制权交容器;DI 是依赖注入,有三种实现方式。Bean 有五种作用域,单例 bean 的线程安全问题及自动装配方式也清晰了。事务基于数据库和 AOP,有失效场景和七种传播行为。AOP 是面向切面编程,动态代理有 JDK 和 CGLIB 两种。 SpringMVC 的 11 步执行流程我烂熟于心,还有那些常用注解的用法。 MyBatis 里,#{} 和 ${} 的区别很关键,获取主键、处理字段与属性名不匹配的方法也掌握了。多表查询、动态
|
4月前
|
安全 Java API
Spring Boot 功能模块全解析:构建现代Java应用的技术图谱
Spring Boot不是一个单一的工具,而是一个由众多功能模块组成的生态系统。这些模块可以根据应用需求灵活组合,构建从简单的REST API到复杂的微服务系统,再到现代的AI驱动应用。
|
3月前
|
Java 数据库 开发者
Spring Boot 框架超级详细总结及长尾关键词应用解析
本文深入讲解Spring Boot框架的核心概念、功能特性及实际应用,涵盖自动配置、独立运行、starter依赖等优势。通过Web开发、微服务架构、批处理等适用场景分析,结合在线书店实战案例,演示项目初始化、数据库设计、分层架构实现全流程。同时探讨热部署、多环境配置、缓存机制与事务管理等高级特性,助你高效掌握Spring Boot开发技巧。代码示例详尽,适合从入门到进阶的学习者。
862 0
|
3月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
423 0

推荐镜像

更多
  • DNS