Spring Cloud Eureka 源码解析(上)

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 本文主要是从 Eureka 源码的角度分析 Eureka 的实现原理和业务细节流程, 在本文的开头也给出了集群模式服务端的配置以及客户端的配置 demo.

Eureka 相关的历程


Sprng Cloud Netflix 部分项目停止更新说明


大致就是说 eureka2.x 不再更新,但是 eureka1.x 进入维护阶段。

spring.io/blog/2019/0…


Spring Cloud Hoxton 发布说明


Hoxton 版本主要特征是支持响应式编程。

spring.io/blog/2019/1…


集群模式配置


版本概述:


spring-boot 2.4.2

spring-cloud 2020.0.1


服务端配置


  1. pom.xml


<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
  </dependency>
</dependencies>  
<dependencyManagement>
  <dependencies>
    <!--spring boot 2.4.2 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <!--spring cloud 2020.0.1 -->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>   


  1. application.yml


server:
  port: 3001
eureka:
  server:
    enable-self-preservation: false # 关闭自我保护机制
    eviction-interval-timer-in-ms: 4000 # 设置间隔(单位:毫秒)
  instance:
    hostname: eureka3000
  client:
    register-with-eureka: false # 不把自己作为一个客户端注册到自己
    fetch-registry: false # 不需要从服务端获取注册信息
    service-url:
      default-zone: http://eureka3001.com:3001/eureka,http://eureka3001.com:3002/eureka,http://eureka3001.com:3003/eureka


  1. 启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class);
    }
}


客户端配置


  1. pom.xml


<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
</dependencies>


  1. yml


server:
  port: 6000
eureka:
  client:
    serviceUrl:
      # 这里要注意是小驼峰如果填写为中划线, 会导致配置失效
      defaultZone: http://127.0.0.1:3000/eureka # eureka 服务端提供的注册地址
  instance:
    instance-id: power-1 #此实例注册到eureka服务端的唯一的实例ID
    prefer-ip-address: true #是否显示IP地址
    leaseRenewalIntervalInSeconds: 10 #eureka客户需要多长时间发送心跳给eureka服务器,表明它仍然活着,默认为30 秒 (与下面配置的单位都是秒)
    leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒
spring:
  application:
    name: servcie-client


  1. 启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class EurekaClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaClientApplication.class);
    }
}


Spring Boot 自定义 starter


Eureka Server 源码


Eureka 不是采用 Spring MVC 作为 Web 通讯框架的采用的是 jersey 作为底层框架,和 Spring MVC 的区别是采用的是 Filter 作为处理请求派发。


一. 服务注册


注册信息的存储,new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); 第一个 string 对应服务名,内层 String 代表实例的id , Lease 租债器, InstanceInfo 表示服务信息


// 服务注册代码
// AbstractInstanceRegistry
// registrant 本次注册请求 传过来的注册信息
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        read.lock();
        try {
            // registry 所有的注册信息存储地址
            // gMap 通过服务名拿到的微服务组
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 已经存在的微服务实例对象
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 当前存在的微服务实例对象的最后操作时间戳
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                // 传过来的注册实例的时间戳
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // 那个时间戳比较靠前就用新的
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    // getHolder() 具体的微服务实例对象
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        // 如果冲突了,自我保护阈值更新
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // Lease 租债器对象
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                // 更新最后正常工作时间
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 服务注册
            gMap.put(registrant.getId(), lease);
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }


二. 服务下架


启动定时器,去判断服务是否过期,判断当前时间是否大于过期时间 lastUpdateTimestamp。


判断公式: 当前系统时间 > lastUpdateTimestamp + 30s (服务过期时间)


  1. 服务剔除


// 遍历所有的服务信息 15分钟一次, 15% 以上需要剔除
public void evict(long additionalLeaseMs) {
  logger.debug("Running the evict task");
  if (!isLeaseExpirationEnabled()) {
    logger.debug("DS: lease expiration is currently disabled.");
    return;
  }
  // We collect first all expired items, to evict them in random order. For large eviction sets,
  // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
  // the impact should be evenly distributed across all applications.
  // 遍历所有的服务信息,判断是否过期放入过期的列表中
  List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
  for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
    Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
    if (leaseMap != null) {
      for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
        Lease<InstanceInfo> lease = leaseEntry.getValue();
        // 判断是否服务过期
        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
          expiredLeases.add(lease);
        }
      }
    }
  }
  // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
  // triggering self-preservation. Without that we would wipe out full registry.
  // 剔除的数量过大的话, 先剔除一部分
  int registrySize = (int) getLocalRegistrySize();
  // serverConfig.getRenewalPercentThreshold() 默认 85%
  int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
  int evictionLimit = registrySize - registrySizeThreshold;
  int toEvict = Math.min(expiredLeases.size(), evictionLimit);
  if (toEvict > 0) {
    logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    // 随机剔除, 类似洗牌算法
    Random random = new Random(System.currentTimeMillis());
    for (int i = 0; i < toEvict; i++) {
      // Pick a random item (Knuth shuffle algorithm)
      int next = i + random.nextInt(expiredLeases.size() - i);
      Collections.swap(expiredLeases, i, next);
      Lease<InstanceInfo> lease = expiredLeases.get(i);
      String appName = lease.getHolder().getAppName();
      String id = lease.getHolder().getId();
      EXPIRED.increment();
      logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
      internalCancel(appName, id, false);
    }
  }
}


三. 心跳连接


更改最后操作时间


// InstanceResource
@PUT
public Response renewLease(
  @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
  @QueryParam("overriddenstatus") String overriddenStatus,
  @QueryParam("status") String status,
  @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
  boolean isFromReplicaNode = "true".equals(isReplication);
  boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
  // Not found in the registry, immediately ask for a register
  if (!isSuccess) {
    logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
    return Response.status(Status.NOT_FOUND).build();
  }
  // Check if we need to sync based on dirty time stamp, the client
  // instance might have changed some value
  Response response;
  if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
    response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
    // Store the overridden status since the validation found out the node that replicates wins
    if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
        && (overriddenStatus != null)
        && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
        && isFromReplicaNode) {
      registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
    }
  } else {
    response = Response.ok().build();
  }
  logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
  return response;
}


相关文章
|
9天前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
7天前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
7天前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
|
2月前
|
XML Java 开发者
Spring底层架构核心概念解析
理解 Spring 框架的核心概念对于开发和维护 Spring 应用程序至关重要。IOC 和 AOP 是其两个关键特性,通过依赖注入和面向切面编程实现了高效的模块化和松耦合设计。Spring 容器管理着 Beans 的生命周期和配置,而核心模块为各种应用场景提供了丰富的功能支持。通过全面掌握这些核心概念,开发者可以更加高效地利用 Spring 框架开发企业级应用。
89 18
|
15天前
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
114 0
|
2月前
|
存储 弹性计算 安全
Cloud Backup深度解析:从被动防御到主动保护
《Cloud Backup深度解析:从被动防御到主动保护》由阿里云高级技术专家张磊分享,探讨企业数据保护面临的挑战及应对策略。内容涵盖企业数据安全威胁、小概率事件的高风险性、传统备份系统的不足,以及通过四步主动防御策略(资源发现、风险检测、数据锁定、全局巡检)实现高效的数据保护。同时介绍了基于标签的自动策略关联、多种备份引擎、恶意文件检测、探测效率优化等关键技术,确保备份数据的安全性和完整性。此外,还展示了数据灾备中心和全方位主动数据保护机制,帮助企业在面对勒索病毒、内部攻击等威胁时,构建更强大的防护体系。
|
1月前
|
传感器 监控 安全
智慧工地云平台的技术架构解析:微服务+Spring Cloud如何支撑海量数据?
慧工地解决方案依托AI、物联网和BIM技术,实现对施工现场的全方位、立体化管理。通过规范施工、减少安全隐患、节省人力、降低运营成本,提升工地管理的安全性、效率和精益度。该方案适用于大型建筑、基础设施、房地产开发等场景,具备微服务架构、大数据与AI分析、物联网设备联网、多端协同等创新点,推动建筑行业向数字化、智能化转型。未来将融合5G、区块链等技术,助力智慧城市建设。
|
3月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
3月前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
3月前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析

热门文章

最新文章

推荐镜像

更多