Eureka 相关的历程
Sprng Cloud Netflix 部分项目停止更新说明
大致就是说 eureka2.x 不再更新,但是 eureka1.x 进入维护阶段。
Spring Cloud Hoxton 发布说明
Hoxton 版本主要特征是支持响应式编程。
集群模式配置
版本概述:
spring-boot 2.4.2
spring-cloud 2020.0.1
服务端配置
- pom.xml
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <!--spring boot 2.4.2 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!--spring cloud 2020.0.1 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
- application.yml
server: port: 3001 eureka: server: enable-self-preservation: false # 关闭自我保护机制 eviction-interval-timer-in-ms: 4000 # 设置间隔(单位:毫秒) instance: hostname: eureka3000 client: register-with-eureka: false # 不把自己作为一个客户端注册到自己 fetch-registry: false # 不需要从服务端获取注册信息 service-url: default-zone: http://eureka3001.com:3001/eureka,http://eureka3001.com:3002/eureka,http://eureka3001.com:3003/eureka
- 启动类
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @SpringBootApplication @EnableEurekaServer public class EurekaServerApplication { public static void main(String[] args) { SpringApplication.run(EurekaServerApplication.class); } }
客户端配置
- pom.xml
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
- yml
server: port: 6000 eureka: client: serviceUrl: # 这里要注意是小驼峰如果填写为中划线, 会导致配置失效 defaultZone: http://127.0.0.1:3000/eureka # eureka 服务端提供的注册地址 instance: instance-id: power-1 #此实例注册到eureka服务端的唯一的实例ID prefer-ip-address: true #是否显示IP地址 leaseRenewalIntervalInSeconds: 10 #eureka客户需要多长时间发送心跳给eureka服务器,表明它仍然活着,默认为30 秒 (与下面配置的单位都是秒) leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒 spring: application: name: servcie-client
- 启动类
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class EurekaClientApplication { public static void main(String[] args) { SpringApplication.run(EurekaClientApplication.class); } }
Spring Boot 自定义 starter
Eureka Server 源码
Eureka 不是采用 Spring MVC 作为 Web 通讯框架的采用的是 jersey 作为底层框架,和 Spring MVC 的区别是采用的是 Filter 作为处理请求派发。
一. 服务注册
注册信息的存储,new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
第一个 string 对应服务名,内层 String 代表实例的id , Lease 租债器, InstanceInfo 表示服务信息
// 服务注册代码 // AbstractInstanceRegistry // registrant 本次注册请求 传过来的注册信息 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { read.lock(); try { // registry 所有的注册信息存储地址 // gMap 通过服务名拿到的微服务组 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } // 已经存在的微服务实例对象 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { // 当前存在的微服务实例对象的最后操作时间戳 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // 传过来的注册实例的时间戳 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. // 那个时间戳比较靠前就用新的 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); // getHolder() 具体的微服务实例对象 registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; // 如果冲突了,自我保护阈值更新 updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } // Lease 租债器对象 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { // 更新最后正常工作时间 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 服务注册 gMap.put(registrant.getId(), lease); recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
二. 服务下架
启动定时器,去判断服务是否过期,判断当前时间是否大于过期时间 lastUpdateTimestamp。
判断公式: 当前系统时间 > lastUpdateTimestamp + 30s (服务过期时间)
- 服务剔除
// 遍历所有的服务信息 15分钟一次, 15% 以上需要剔除 public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. // 遍历所有的服务信息,判断是否过期放入过期的列表中 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); // 判断是否服务过期 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. // 剔除的数量过大的话, 先剔除一部分 int registrySize = (int) getLocalRegistrySize(); // serverConfig.getRenewalPercentThreshold() 默认 85% int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); // 随机剔除, 类似洗牌算法 Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); } } }
三. 心跳连接
更改最后操作时间
// InstanceResource @PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // Check if we need to sync based on dirty time stamp, the client // instance might have changed some value Response response; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); return response; }