SpringCloud源码剖析-Eureka Server初始化流程

简介: Eureka服务端上下文对象,包含了初始化,关闭,获取服务配置,获取集群节点,获取服务注册器,获取服务信息管理器等方法,默认实现类是DefaultEurekaServerContext

前言

上一章我们分析了一下EureakServer的自动配置,这章节我们来详细分析一下Eureak Server中的核心组件以及初始化流程


一.Eureka Server 核心组件介绍

1.EurekaServerContext

Eureka服务端上下文对象,包含了初始化,关闭,获取服务配置,获取集群节点,获取服务注册器,获取服务信息管理器等方法,默认实现类是DefaultEurekaServerContext

publicinterfaceEurekaServerContext {
//初始化voidinitialize() throwsException;
//关闭voidshutdown() throwsException;
//获取服务配置EurekaServerConfiggetServerConfig();
//获取集群节点管理管理类PeerEurekaNodesgetPeerEurekaNodes();
//服务器编解码器ServerCodecsgetServerCodecs();
//服务注册器PeerAwareInstanceRegistrygetRegistry();
//instanceInfo实例信息管理器ApplicationInfoManagergetApplicationInfoManager();
}

DefaultEurekaServerContext实现类代码

/*** Represent the local server context and exposes getters to components of the* local server such as the registry.** @author David Liu*/@SingletonpublicclassDefaultEurekaServerContextimplementsEurekaServerContext {
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(DefaultEurekaServerContext.class);
privatefinalEurekaServerConfigserverConfig;
privatefinalServerCodecsserverCodecs;
privatefinalPeerAwareInstanceRegistryregistry;
privatefinalPeerEurekaNodespeerEurekaNodes;
privatefinalApplicationInfoManagerapplicationInfoManager;
@InjectpublicDefaultEurekaServerContext(EurekaServerConfigserverConfig,
ServerCodecsserverCodecs,
PeerAwareInstanceRegistryregistry,
PeerEurekaNodespeerEurekaNodes,
ApplicationInfoManagerapplicationInfoManager) {
this.serverConfig=serverConfig;
this.serverCodecs=serverCodecs;
this.registry=registry;
this.peerEurekaNodes=peerEurekaNodes;
this.applicationInfoManager=applicationInfoManager;
    }
//  @PostConstruct :EurekaServerContext初始化的时候initialize方法被执行,调用 peerEurekaNodes.start();开启EurekaServer的初始化,//然后再调用 peerAwareInstanceRegistry的.init(peerEurekaNodes);方法初始化@PostConstruct@Overridepublicvoidinitialize() {
logger.info("Initializing ...");
//PeerEurekaNodes开始初始化peerEurekaNodes.start();
try {
//peerAwareInstanceRegistry开始初始化registry.init(peerEurekaNodes);
        } catch (Exceptione) {
thrownewRuntimeException(e);
        }
logger.info("Initialized");
    }
//EurekaServerContext销毁之前(@PreDestroy)调用shutdown,//peerAwareInstanceRegistry 注册器的shutdown执行关闭流程@PreDestroy@Overridepublicvoidshutdown() {
logger.info("Shutting down ...");
//服务注册器关闭registry.shutdown();
//peerEurekaNodes集群节点关闭peerEurekaNodes.shutdown();
logger.info("Shut down");
    }
    ...省略...
}

DefaultEurekaServerContext的initialize初始化方法中做的事情就是在初始化的时候,调用peerEurekaNodes.start();初始化集群节点, 调用PeerAwareInstanceRegistry.init初始化注册器,在shutdown销毁方法中调用PeerAwareInstanceRegistry.shudown执行注册器的关闭流程,调用peerEurekaNodes.shutdown执行集群节点的关闭

2.PeerEurekaNodes

PeerEurekaNodes用来管理Eureka集群节点PeerEurekaNode生命周期的工具被DefaultEurekaServerContext 的initialize初始化方法中执行,源码如下

/*** Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.** @author Tomasz Bak*/@SingletonpublicclassPeerEurekaNodes {
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(PeerEurekaNodes.class);
//服务注册接口protectedfinalPeerAwareInstanceRegistryregistry;
//服务端配置对象protectedfinalEurekaServerConfigserverConfig;
//客户端配置protectedfinalEurekaClientConfigclientConfig;
protectedfinalServerCodecsserverCodecs;
//InstanceInfo实例管理器privatefinalApplicationInfoManagerapplicationInfoManager;
//Eureka集群节点集合privatevolatileList<PeerEurekaNode>peerEurekaNodes=Collections.emptyList();
//Eureka集群节点的url集合privatevolatileSet<String>peerEurekaNodeUrls=Collections.emptySet();
//定时任务执行器privateScheduledExecutorServicetaskExecutor;
//初始化节点工具@InjectpublicPeerEurekaNodes(
PeerAwareInstanceRegistryregistry,
EurekaServerConfigserverConfig,
EurekaClientConfigclientConfig,
ServerCodecsserverCodecs,
ApplicationInfoManagerapplicationInfoManager) {
this.registry=registry;
this.serverConfig=serverConfig;
this.clientConfig=clientConfig;
this.serverCodecs=serverCodecs;
this.applicationInfoManager=applicationInfoManager;
    }
//获取集群节点集合,不可修改publicList<PeerEurekaNode>getPeerNodesView() {
returnCollections.unmodifiableList(peerEurekaNodes);
    }
//获取集群节点集合publicList<PeerEurekaNode>getPeerEurekaNodes() {
returnpeerEurekaNodes;
    }
//此实例提供对等复制实例的最小数量,被认为是健康的publicintgetMinNumberOfAvailablePeers() {
returnserverConfig.getHealthStatusMinNumberOfAvailablePeers();
    }
//开始publicvoidstart() {
//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器taskExecutor=Executors.newSingleThreadScheduledExecutor(
newThreadFactory() {
@OverridepublicThreadnewThread(Runnabler) {
Threadthread=newThread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
returnthread;
                    }
                }
        );
try {
//更新集群中的节点中的注册信息updatePeerEurekaNodes(resolvePeerUrls());
//创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());RunnablepeersUpdateTask=newRunnable() {
@Overridepublicvoidrun() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwablee) {
logger.error("Cannot update the replica Nodes", e);
                    }
                }
            };
//taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
//定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTESserverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS            );
        } catch (Exceptione) {
thrownewIllegalStateException(e);
        }
for (PeerEurekaNodenode : peerEurekaNodes) {
logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }
//关闭,关闭节点更新的定时任务,清空peerEurekaNodes ,peerEurekaNodeUrls ,调用每个节点的shutDown方法publicvoidshutdown() {
taskExecutor.shutdown();
List<PeerEurekaNode>toRemove=this.peerEurekaNodes;
this.peerEurekaNodes=Collections.emptyList();
this.peerEurekaNodeUrls=Collections.emptySet();
for (PeerEurekaNodenode : toRemove) {
node.shutDown();
        }
    }
/**基于相同的Zone得到Eureka集群中多个节点的url,过滤掉当前节点* Resolve peer URLs.** @return peer URLs with node's own URL filtered out*/protectedList<String>resolvePeerUrls() {
InstanceInfomyInfo=applicationInfoManager.getInfo();
Stringzone=InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
//配置的eureka地址urlList<String>replicaUrls=EndpointUtils                .getDiscoveryServiceUrls(clientConfig, zone, newEndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
intidx=0;
while (idx<replicaUrls.size()) {
//移除当前eureka节点的urlif (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
            } else {
idx++;
            }
        }
returnreplicaUrls;
    }
/**更新集群节点列表的方法,在定时器中被执行,newPeerUrls是集群中的eureka server节点的url,过滤了本地节点的url做法是删除老的不可用的节点调用shutDown方法,使用createPeerEurekaNode创建新的节点添加新的节点* Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and* create new ones.** @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out*///修改集群节点protectedvoidupdatePeerEurekaNodes(List<String>newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
        }
//需要关闭的节点Set<String>toShutdown=newHashSet<>(peerEurekaNodeUrls);
//移除掉新的节点,新的节点不需要关闭toShutdown.removeAll(newPeerUrls);
//新的节点需要添加Set<String>toAdd=newHashSet<>(newPeerUrls);
//新的节点中移除老的节点toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() &&toAdd.isEmpty()) { // No changereturn;
        }
// Remove peers no long available   移除不可用的节点//节点集合,本地缓存的所有节点List<PeerEurekaNode>newNodeList=newArrayList<>(peerEurekaNodes);
//如果需要关闭的节点集合不为空if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
inti=0;
while (i<newNodeList.size()) {
PeerEurekaNodeeurekaNode=newNodeList.get(i);
//如果当前节点需要关闭,包含在toShutdown中if (toShutdown.contains(eurekaNode.getServiceUrl())) {
//从newNodeList中移除掉newNodeList.remove(i);
//执行节点的关闭方法eurekaNode.shutDown();
                } else {
i++;
                }
            }
        }
// Add new peers  如果需要添加新的节点if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (StringpeerUrl : toAdd) {
//调用 createPeerEurekaNode 创建新的节点,添加到节点集合中newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
this.peerEurekaNodes=newNodeList;
this.peerEurekaNodeUrls=newHashSet<>(newPeerUrls);
    }
//创建集群节点PeerEurekaNode protectedPeerEurekaNodecreatePeerEurekaNode(StringpeerEurekaNodeUrl) {
HttpReplicationClientreplicationClient=JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
StringtargetHost=hostFromUrl(peerEurekaNodeUrl);
if (targetHost==null) {
targetHost="host";
        }
returnnewPeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }
...省略...

PeerEurekaNodes主要定义了eureka集群节点更新逻辑,通过定时任务定时更新,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown做关闭操作,新的节点调用createPeerEurekaNode进行创建,集群节点最终存储在List<PeerEurekaNode>结构中

3.PeerAwareInstanceRegistry

PeerAwareInstanceRegistry翻译为“对等感知实例注册表” ,其实就是服务注册器,只是这个注册器会考虑集群中的其它节点的数据同步,

@BeanpublicPeerAwareInstanceRegistrypeerAwareInstanceRegistry(
ServerCodecsserverCodecs) {
this.eurekaClient.getApplications(); // force initializationreturnnewInstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

看一下继承关系:

  • LookupService:服务查找接口,提供了服务的方法,在之前有介绍过,在客户端EurekaClient继承该接口,在服务端InstanceRegistry继承了该接口
publicinterfaceLookupService<T> {
//获取应用ApplicationgetApplication(Stringvar1);
//获取应用注册表ApplicationsgetApplications();
//获取实例信息列表List<InstanceInfo>getInstancesById(Stringvar1);
//获取下一台服务实例信息InstanceInfogetNextServerFromEureka(Stringvar1, booleanvar2);
}
  • LeaseManager:租约管理器接口,提供了register注册,cancel取消,renew续约,evict过期等服务相关的操作
publicinterfaceLeaseManager<T> {
//注册voidregister(Tr, intleaseDuration, booleanisReplication);
//取消,下线booleancancel(StringappName, Stringid, booleanisReplication);
//续约booleanrenew(StringappName, Stringid, booleanisReplication);
//过期voidevict();
}
  • InstanceRegistry:应用实例注册表接口继承了 LookupService 、LeaseManager ,提供应用实例的注册与发现服务,
publicinterfaceInstanceRegistryextendsLeaseManager<InstanceInfo>, LookupService<String> {
//允许开始传输数据voidopenForTraffic(ApplicationInfoManagerapplicationInfoManager, intcount);
//关闭voidshutdown();
@DeprecatedvoidstoreOverriddenStatusIfRequired(Stringid, InstanceStatusoverriddenStatus);
//存储实例 覆盖状态:使用的是InstanceInfo.InstanceStatus overriddenStatus 覆盖状态,//使用该状态来修改注册中心服务的注册状态·  ·voidstoreOverriddenStatusIfRequired(StringappName, Stringid, InstanceStatusoverriddenStatus);
//更新服务注册状态booleanstatusUpdate(StringappName, Stringid, InstanceStatusnewStatus,
StringlastDirtyTimestamp, booleanisReplication);
//删除覆盖的状态booleandeleteStatusOverride(StringappName, Stringid, InstanceStatusnewStatus,
StringlastDirtyTimestamp, booleanisReplication);
//服务状态快照Map<String, InstanceStatus>overriddenInstanceStatusesSnapshot();
//获取本地服务注册表,从本地ConcurrentHashMap缓存的服务注册表中获取ApplicationsgetApplicationsFromLocalRegionOnly();
//获取服务注册表List<Application>getSortedApplications();
//根据名字获取服务ApplicationgetApplication(StringappName, booleanincludeRemoteRegion);
//根据名字和id获取实例信息InstanceInfogetInstanceByAppAndId(StringappName, Stringid);
//根据名字和id获取实例信息InstanceInfogetInstanceByAppAndId(StringappName, Stringid, booleanincludeRemoteRegions);
//完全清除注册表//overriddenInstanceStatusMap.clear();  覆盖状态清除//recentCanceledQueue.clear();      最近取消队列//recentRegisteredQueue.clear();    最近注册队列//recentlyChangedQueue.clear();     最近更改队列//registry.clear();                 清除注册表voidclearRegistry();
//初始化的响应缓存voidinitializedResponseCache();
//获取响应缓存ResponseCachegetResponseCache();
//最后一分钟续约次数,用作自我保护计算值longgetNumOfRenewsInLastMin();
//获取每分钟续约次数,用作自我保护计算值intgetNumOfRenewsPerMinThreshold();
//检查续订次数是否小于阈值。intisBelowRenewThresold();
//最近注册的实例List<Pair<Long, String>>getLastNRegisteredInstances();
//最近取消的实例List<Pair<Long, String>>getLastNCanceledInstances();
//最近过期的实例booleanisLeaseExpirationEnabled();
//是否开启自我保护booleanisSelfPreservationModeEnabled();
}
  • AbstractInstanceRegistry:InstanceRegistry的实现类,应用对象注册表抽象,处理客户端的注册请求,包括 register注册,Renewals续约,Cancels下线,Expirations过期,Status Changes状态改变,服务注册表以增量的方式增加
  • PeerAwareInstanceRegistry: InstanceRegistry的子接口,应用对象注册表接口,实现了 Eureka-Server 集群内注册信息同步功能
publicinterfacePeerAwareInstanceRegistryextendsInstanceRegistry {
//初始化PeerEurekaNodes 集群节点voidinit(PeerEurekaNodespeerEurekaNodes) throwsException;
//注册表信息同步, 如果节点之间通信失败,列表中耗尽该操作故障转移到其他节点intsyncUp();
//检查是否有访问权限booleanshouldAllowAccess(booleanremoteRegionRequired);
//注册InstanceInfo到其他Eureka节点voidregister(InstanceInfoinfo, booleanisReplication);
//修改状态voidstatusUpdate(finalStringasgName, finalASGResource.ASGStatusnewStatus, finalbooleanisReplication);
}
  • PeerAwareInstanceRegistryImpl:PeerAwareInstanceRegistry的子类,,应用对象注册的具体实现,同时继承了AbstractInstanceRegistry
  • InstanceRegistry :PeerAwareInstanceRegistryImpl的子类,

有些实现类没拉开看,后面会详细分析

4.PeerAwareInstanceRegistryImpl

服务注册器,继承AbstractInstanceRegistry抽象类, 实现 PeerAwareInstanceRegistry服务注册接口,包含了服务注册,续约,下线,过期,状态改变等等功能。

/**集群之间节点同步的服务注册器, 所有操作都在其父类 AbstractInstanceRegistry 中,* Handles replication of all operations to {@link AbstractInstanceRegistry} to peer* <em>Eureka</em> nodes to keep them all in sync.** <p>* 主要操作是副本的注册,续约,取消,到期和状态更改* Primary operations that are replicated are the* <em>Registers,Renewals,Cancels,Expirations and Status Changes</em>* </p>** <p>* 当eureka服务器启动时,它将尝试从对等的eureka节点获取所有注册表信息,如果由于某种原因该操作失败,* 则服务器将不允许用户在指定的时间段内获取注册表信息。* When the eureka server starts up it tries to fetch all the registry* information from the peer eureka nodes.If for some reason this operation* fails, the server does not allow the user to get the registry information for* a period specified in* {@link com.netflix.eureka.EurekaServerConfig#getWaitTimeInMsWhenSyncEmpty()}.* </p>** <p>* *关于续约的重要注意事项。如果续约失败次数超过EurekaServerConfig.getRenewalPercentThreshold()中指定的指定阈值,则在EurekaServerConfig#getRenewalThresholdUpdateIntervalMs()时间内,eureka将其视为危险,并停止实例过期* * One important thing to note about <em>renewals</em>.If the renewal drops more* than the specified threshold as specified in* {@link com.netflix.eureka.EurekaServerConfig#getRenewalPercentThreshold()} within a period of* {@link com.netflix.eureka.EurekaServerConfig#getRenewalThresholdUpdateIntervalMs()}, eureka* perceives this as a danger and stops expiring instances.* </p>** @author Karthik Ranganathan, Greg Kim**/@SingletonpublicclassPeerAwareInstanceRegistryImplextendsAbstractInstanceRegistryimplementsPeerAwareInstanceRegistry {
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(PeerAwareInstanceRegistryImpl.class);
privatestaticfinalStringUS_EAST_1="us-east-1";
privatestaticfinalintPRIME_PEER_NODES_RETRY_MS=30000;
privatelongstartupTime=0;
privatebooleanpeerInstancesTransferEmptyOnStartup=true;
//把功能抽成枚举,心跳检查,注册,取消注册,状态改变,删除覆盖状态publicenumAction {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
privatecom.netflix.servo.monitor.Timertimer=Monitors.newTimer(this.name());
publiccom.netflix.servo.monitor.TimergetTimer() {
returnthis.timer;
        }
    }
privatestaticfinalComparator<Application>APP_COMPARATOR=newComparator<Application>() {
publicintcompare(Applicationl, Applicationr) {
returnl.getName().compareTo(r.getName());
        }
    };
privatefinalMeasuredRatenumberOfReplicationsLastMin;
//客户端protectedfinalEurekaClienteurekaClient;
//集群节点管理protectedvolatilePeerEurekaNodespeerEurekaNodes;
privatefinalInstanceStatusOverrideRuleinstanceStatusOverrideRule;
privateTimertimer=newTimer(
"ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true);
@InjectpublicPeerAwareInstanceRegistryImpl(
EurekaServerConfigserverConfig,
EurekaClientConfigclientConfig,
ServerCodecsserverCodecs,
EurekaClienteurekaClient    ) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient=eurekaClient;
//最后一分钟的复制次数this.numberOfReplicationsLastMin=newMeasuredRate(1000*60*1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,// then we check the status of a potentially existing lease.this.instanceStatusOverrideRule=newFirstMatchWinsCompositeRule(newDownOrStartingRule(),
newOverrideExistsRule(overriddenInstanceStatusMap), newLeaseExistsRule());
    }
@OverrideprotectedInstanceStatusOverrideRulegetInstanceInfoOverrideRule() {
returnthis.instanceStatusOverrideRule;
    }
//初始化方法@Overridepublicvoidinit(PeerEurekaNodespeerEurekaNodes) throwsException {
//最后一分钟的复制次数定时器Timer开始this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes=peerEurekaNodes;
//初始化 ResponseCache ,负责缓存客户端查询的注册表信息 30s/1次initializedResponseCache();
//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新scheduleRenewalThresholdUpdateTask();
//初始化远程注册表,默认么有远程RegioninitRemoteRegionRegistry();
try {
//注册到对象监视器Monitors.registerObject(this);
        } catch (Throwablee) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }
/**执行所有清理和关闭操作。* Perform all cleanup and shutdown operations.*/@Overridepublicvoidshutdown() {
try {
//注销对象监视DefaultMonitorRegistry.getInstance().unregister(Monitors.newObjectMonitor(this));
        } catch (Throwablet) {
logger.error("Cannot shutdown monitor registry", t);
        }
try {
//集群节点关闭peerEurekaNodes.shutdown();
        } catch (Throwablet) {
logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", t);
        }
//最后一分钟的复制次数定时器 Timer停止numberOfReplicationsLastMin.stop();
//执行所有清理和关闭操作。//deltaRetentionTimer.cancel();  增量保留计时器//evictionTimer.cancel(); 服务剔除计时器//renewsLastMin.stop();  最后一分钟的复制次数机器停止super.shutdown();
    }
//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新privatevoidscheduleRenewalThresholdUpdateTask() {
timer.schedule(newTimerTask() {
@Overridepublicvoidrun() {
updateRenewalThreshold();
                           }
                       }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
    }
/**集群数据同步,从集群中eureka节点复制注册表信息。如果通信失败,此操作将故障转移到其他节点,直到列表用尽。* Populates the registry information from a peer eureka node. This* operation fails over to other nodes until the list is exhausted if the* communication fails.*/@OverridepublicintsyncUp() {
// Copy entire entry from neighboring DS nodeintcount=0;
//getRegistrySyncRetries重试次数默认5次for (inti=0; ((i<serverConfig.getRegistrySyncRetries()) && (count==0)); i++) {
if (i>0) {
try {
//通信中断,等待下一次切换实例Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedExceptione) {
logger.warn("Interrupted during registry transfer..");
break;
                }
            }
//获取注册表Applicationsapps=eurekaClient.getApplications();
//循环服务列表,依次注册for (Applicationapp : apps.getRegisteredApplications()) {
for (InstanceInfoinstance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry 中缓存起来register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
                        }
                    } catch (Throwablet) {
logger.error("During DS init copy", t);
                    }
                }
            }
        }
returncount;
    }
//运行开始传输数据@OverridepublicvoidopenForTraffic(ApplicationInfoManagerapplicationInfoManager, intcount) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.//每分钟的预期续订次数 2次,30s/一次续约this.expectedNumberOfRenewsPerMin=count*2;
//每分钟续约次数阈值  = expectedNumberOfRenewsPerMin每分钟续约次数 * 85%//如果客户端续约低于这个阈值,将会开启服务端的自我保护功能this.numberOfRenewsPerMinThreshold=                (int) (this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold());
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime=System.currentTimeMillis();
if (count>0) {
this.peerInstancesTransferEmptyOnStartup=false;
        }
DataCenterInfo.NameselfName=applicationInfoManager.getInfo().getDataCenterInfo().getName();
booleanisAws=Name.Amazon==selfName;
if (isAws&&serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
        }
//改变服务的状态为UPlogger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
//这里使用定时任务开启新的 服务剔除任务super.postInit();
    }
//取消注册,服务下线@Overridepublicbooleancancel(finalStringappName, finalStringid,
finalbooleanisReplication) {
//调用父类的下线方法if (super.cancel(appName, id, isReplication)) {
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin>0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)//客户下线,降低续约阈值this.expectedNumberOfRenewsPerMin=this.expectedNumberOfRenewsPerMin-2;
this.numberOfRenewsPerMinThreshold=                            (int) (this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold());
                }
            }
returntrue;
        }
returnfalse;
    }
//服务注册@Overridepublicvoidregister(finalInstanceInfoinfo, finalbooleanisReplication) {
intleaseDuration=Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() !=null&&info.getLeaseInfo().getDurationInSecs() >0) {
leaseDuration=info.getLeaseInfo().getDurationInSecs();
        }
//调用父类的注册super.register(info, leaseDuration, isReplication);
//注册信息同步到集群中其他节点replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
//续约publicbooleanrenew(finalStringappName, finalStringid, finalbooleanisReplication) {
//调用父类的续约if (super.renew(appName, id, isReplication)) {
//同步到集群中的其他节点replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
returntrue;
        }
returnfalse;
    }
//修改服务状态@OverridepublicbooleanstatusUpdate(finalStringappName, finalStringid,
finalInstanceStatusnewStatus, StringlastDirtyTimestamp,
finalbooleanisReplication) {
if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
//状态同步到其他节点replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
returntrue;
        }
returnfalse;
    }
//删除状态@OverridepublicbooleandeleteStatusOverride(StringappName, Stringid,
InstanceStatusnewStatus,
StringlastDirtyTimestamp,
booleanisReplication) {
if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
returntrue;
        }
returnfalse;
    }
//是否启用租约到期@OverridepublicbooleanisLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.returntrue;
        }
returnnumberOfRenewsPerMinThreshold>0&&getNumOfRenewsInLastMin() >numberOfRenewsPerMinThreshold;
    }
//更新续约阈值privatevoidupdateRenewalThreshold() {
try {
Applicationsapps=eurekaClient.getApplications();
//统计有多少个实例intcount=0;
for (Applicationapp : apps.getRegisteredApplications()) {
for (InstanceInfoinstance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
                    }
                }
            }
synchronized (lock) {
//仅当阈值大于当前的预期阈值,或禁用了自我保留时才更新阈值。// Update threshold only if the threshold is greater than the// current expected threshold or if self preservation is disabled.if ((count*2) > (serverConfig.getRenewalPercentThreshold() *expectedNumberOfRenewsPerMin)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin=count*2;
this.numberOfRenewsPerMinThreshold= (int) ((count*2) *serverConfig.getRenewalPercentThreshold());
                }
            }
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwablee) {
logger.error("Cannot update renewal threshold", e);
        }
    }
/**集群之间的节点复制* Replicates all eureka actions to peer eureka nodes except for replication* traffic to this node.**/privatevoidreplicateToPeers(Actionaction, StringappName, Stringid,
InstanceInfoinfo/* optional */,
InstanceStatusnewStatus/* optional */, booleanisReplication) {
Stopwatchtracer=action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
            }
// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes==Collections.EMPTY_LIST||isReplication) {
return;
            }
for (finalPeerEurekaNodenode : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
                }
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
tracer.stop();
        }
    }
/**集群之间的节点复制* Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/privatevoidreplicateInstanceActionsToPeers(Actionaction, StringappName,
Stringid, InstanceInfoinfo, InstanceStatusnewStatus,
PeerEurekaNodenode) {
try {
InstanceInfoinfoFromRegistry=null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
caseCancel:
node.cancel(appName, id);
break;
caseHeartbeat:
InstanceStatusoverriddenStatus=overriddenInstanceStatusMap.get(id);
infoFromRegistry=getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
caseRegister:
node.register(info);
break;
caseStatusUpdate:
infoFromRegistry=getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
caseDeleteStatusOverride:
infoFromRegistry=getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
            }
        } catch (Throwablet) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }
  ....省略一些代码....
}

这个服务注册器实现类看起来很复杂它做了那些事情呢

  • init初始化:注册表缓存ResponseCache初始化,续约阈值定时更新任务初始化,初始化远程注册表
  • showdown:执行所有清理和关闭操作
  • syncUp:集群之间的数据同步节点复制
  • cancel:服务下线,并同步到其他节点
  • register:服务注册,并同步到其他节点
  • renew: 续约,并同步到其他节点

5.EurekaServerInitializerConfiguration

EurekaServerAutoConfiguration 通过 @Import(EurekaServerInitializerConfiguration.class)进行初始化,EurekaServerInitializerConfiguration实现了SmartLifecycle,其中的start方法会再Spring启动过程中,执行LifecycleProcessor().onRefresh()生命周期处理器刷新的时候被调用,然后再调用EurekaServerBootstrap.contextInitialized进行初始化Eureka和启动Eureka

/*** @author Dave Syer*/@ConfigurationpublicclassEurekaServerInitializerConfigurationimplementsServletContextAware, SmartLifecycle, Ordered {
privatestaticfinalLoglog=LogFactory.getLog(EurekaServerInitializerConfiguration.class);
//EurekaServer 配置@AutowiredprivateEurekaServerConfigeurekaServerConfig;
//Servlet上下文privateServletContextservletContext;
//应用上下文对象@AutowiredprivateApplicationContextapplicationContext;
//启动引导@AutowiredprivateEurekaServerBootstrapeurekaServerBootstrap;
privatebooleanrunning;
privateintorder=1;
//初始化Servlet上下文@OverridepublicvoidsetServletContext(ServletContextservletContext) {
this.servletContext=servletContext;
    }
//开始方法,复写于 SmartLifecycle 在Spring启动的时候,该方法会被地调用,@Overridepublicvoidstart() {
newThread(newRunnable() {
@Overridepublicvoidrun() {
try {
//TODO: is this class even needed now?//初始化EurekaServer上下文,启动EurekaServereurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//发布一个EurekaRegistryAvailableEvent注册事件publish(newEurekaRegistryAvailableEvent(getEurekaServerConfig()));
//改变running状态trueEurekaServerInitializerConfiguration.this.running=true;
//发布EurekaServer启动事件EurekaServerStartedEventpublish(newEurekaServerStartedEvent(getEurekaServerConfig()));
                }
catch (Exceptionex) {
// Help!log.error("Could not initialize Eureka servlet context", ex);
                }
            }
        }).start();
    }
privateEurekaServerConfiggetEurekaServerConfig() {
returnthis.eurekaServerConfig;
    }
privatevoidpublish(ApplicationEventevent) {
this.applicationContext.publishEvent(event);
    }
//生命周期,停止,销毁eurekaServer@Overridepublicvoidstop() {
this.running=false;
eurekaServerBootstrap.contextDestroyed(this.servletContext);
    }
@OverridepublicbooleanisRunning() {
returnthis.running;
    }
@OverridepublicintgetPhase() {
return0;
    }
@OverridepublicbooleanisAutoStartup() {
returntrue;
    }
@Overridepublicvoidstop(Runnablecallback) {
callback.run();
    }
@OverridepublicintgetOrder() {
returnthis.order;
    }
}

EurekaServerInitializerConfiguration通过starter初始化和启动eureka,并抛出两个事件:EurekaRegistryAvailableEvent服务注册事件,EurekaServerStartedEvent服务启动事件,EurekaServer初始化核心的代码在eurekaServerBootstrap.contextInitialized

6.EurekaServerBootstrap

/*** @author Spencer Gibb*/publicclassEurekaServerBootstrap {
privatestaticfinalLoglog=LogFactory.getLog(EurekaServerBootstrap.class);
privatestaticfinalStringTEST="test";
privatestaticfinalStringARCHAIUS_DEPLOYMENT_ENVIRONMENT="archaius.deployment.environment";
privatestaticfinalStringEUREKA_ENVIRONMENT="eureka.environment";
privatestaticfinalStringDEFAULT="default";
privatestaticfinalStringARCHAIUS_DEPLOYMENT_DATACENTER="archaius.deployment.datacenter";
privatestaticfinalStringEUREKA_DATACENTER="eureka.datacenter";
protectedEurekaServerConfigeurekaServerConfig;
protectedApplicationInfoManagerapplicationInfoManager;
protectedEurekaClientConfigeurekaClientConfig;
protectedPeerAwareInstanceRegistryregistry;
protectedvolatileEurekaServerContextserverContext;
protectedvolatileAwsBinderawsBinder;
publicEurekaServerBootstrap(ApplicationInfoManagerapplicationInfoManager,
EurekaClientConfigeurekaClientConfig, EurekaServerConfigeurekaServerConfig,
PeerAwareInstanceRegistryregistry, EurekaServerContextserverContext) {
this.applicationInfoManager=applicationInfoManager;
this.eurekaClientConfig=eurekaClientConfig;
this.eurekaServerConfig=eurekaServerConfig;
this.registry=registry;
this.serverContext=serverContext;
    }
//Eureka初始化publicvoidcontextInitialized(ServletContextcontext) {
try {
//初始化环境initEurekaEnvironment();
//初始化上下文initEurekaServerContext();
//设置上下文属性context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
catch (Throwablee) {
log.error("Cannot bootstrap eureka server :", e);
thrownewRuntimeException("Cannot bootstrap eureka server :", e);
        }
    }
//eureka上下文销毁publicvoidcontextDestroyed(ServletContextcontext) {
try {
log.info("Shutting down Eureka Server..");
context.removeAttribute(EurekaServerContext.class.getName());
destroyEurekaServerContext();
destroyEurekaEnvironment();
        }
catch (Throwablee) {
log.error("Error shutting down eureka", e);
        }
log.info("Eureka Service is now shutdown...");
    }
//初始化环境,设置一些环境参数protectedvoidinitEurekaEnvironment() throwsException {
log.info("Setting the eureka configuration..");
//设置数据中心StringdataCenter=ConfigurationManager.getConfigInstance()
                .getString(EUREKA_DATACENTER);
if (dataCenter==null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
        }
else {
ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
        }
//设置Eureka环境Stringenvironment=ConfigurationManager.getConfigInstance()
                .getString(EUREKA_ENVIRONMENT);
if (environment==null) {
ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
        }
else {
ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
        }
    }
//初始化eurekaServer上下文protectedvoidinitEurekaServerContext() throwsException {
// For backward compatibilityJsonXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder=newAwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
        }
//把EurekaServerContext设置到EurekaServerContextHolder中EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现intregistryCount=this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.//注册所有监视统计信息。EurekaMonitors.registerAllStats();
    }   
/*** Server context shutdown hook. Override for custom logic*/protectedvoiddestroyEurekaServerContext() throwsException {
EurekaMonitors.shutdown();
if (this.awsBinder!=null) {
this.awsBinder.shutdown();
        }
if (this.serverContext!=null) {
this.serverContext.shutdown();
        }
    }
/*** Users can override to clean up the environment themselves.*/protectedvoiddestroyEurekaEnvironment() throwsException {
    }
protectedbooleanisAws(InstanceInfoselfInstanceInfo) {
booleanresult=DataCenterInfo.Name.Amazon==selfInstanceInfo                .getDataCenterInfo().getName();
log.info("isAws returned "+result);
returnresult;
    }
}

EurekaServerBootstrap 的contextInitialized方法中做了两个事情

  • 通过initEurekaEnvironment();方法初始化环境,通过ConfigurationManager设置环境相关的参数
  • 通过initEurekaServerContext();初始化上下文,使用PeerAwareInstanceRegistryImpl.syncUp从相邻的eureka节点复制注册表

7.JerseyFilter

在EurekaServerAutoConfiguration中注册了JerseyFilter用来处理所有的/eureka开头的请求

/*** Register the Jersey filter*/@BeanpublicFilterRegistrationBeanjerseyFilterRegistration(
javax.ws.rs.core.ApplicationeurekaJerseyApp) {
FilterRegistrationBeanbean=newFilterRegistrationBean();
bean.setFilter(newServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX+"/*"));
returnbean;
}

通过FilterRegistrationBean来注册filter,其核心逻辑是交给 ServletContainer 来完成的

publicclassServletContainerextendsHttpServletimplementsFilter {
    ...省略...
}

二.EurekaServer初始化流程

这里我们整理一下EurekaServer启动时是以什么样的流程进行初始化的,下面是根据Eureka Server启动断点跟踪出来的流程

1.ServletContainer 初始化

首先ServletContainer 会被创建并进行初始化,调用configure方法进行配置,至于 doFilter方法会在接受到请求时被执行

publicclassServletContainerextendsHttpServletimplementsFilter {
    ...省略...
publicvoidinit(FilterConfigfilterConfig) throwsServletException {
this.filterConfig=filterConfig;
this.init((WebConfig)(newWebFilterConfig(filterConfig)));
    }
protectedvoidconfigure(WebConfigwc, ResourceConfigrc, WebApplicationwa) {
if (this.getServletConfig() !=null) {
this.configure(this.getServletConfig(), rc, wa);
        } elseif (this.filterConfig!=null) {
this.configure(this.filterConfig, rc, wa);
        }
if (rcinstanceofReloadListener) {
List<ContainerNotifier>notifiers=newArrayList();
Objecto=rc.getProperties().get("com.sun.jersey.spi.container.ContainerNotifier");
Iteratori$;
if (oinstanceofContainerNotifier) {
notifiers.add((ContainerNotifier)o);
            } elseif (oinstanceofList) {
i$= ((List)o).iterator();
while(i$.hasNext()) {
Objectelem=i$.next();
if (eleminstanceofContainerNotifier) {
notifiers.add((ContainerNotifier)elem);
                    }
                }
            }
i$=ServiceFinder.find(ContainerNotifier.class).iterator();
while(i$.hasNext()) {
ContainerNotifiercn= (ContainerNotifier)i$.next();
notifiers.add(cn);
            }
rc.getProperties().put("com.sun.jersey.spi.container.ContainerNotifier", notifiers);
        }
    }
protectedvoidconfigure(FilterConfigfc, ResourceConfigrc, WebApplicationwa) {
rc.getSingletons().add(newServletContainer.ContextInjectableProvider(FilterConfig.class, fc));
Stringregex= (String)rc.getProperty("com.sun.jersey.config.property.WebPageContentRegex");
if (regex!=null&&regex.length() >0) {
try {
this.staticContentPattern=Pattern.compile(regex);
            } catch (PatternSyntaxExceptionvar6) {
thrownewContainerException("The syntax is invalid for the regular expression, "+regex+", associated with the initialization parameter "+"com.sun.jersey.config.property.WebPageContentRegex", var6);
            }
        }
this.forwardOn404=rc.getFeature("com.sun.jersey.config.feature.FilterForwardOn404");
this.filterContextPath=this.filterConfig.getInitParameter("com.sun.jersey.config.feature.FilterContextPath");
if (this.filterContextPath!=null) {
if (this.filterContextPath.isEmpty()) {
this.filterContextPath=null;
            } else {
if (!this.filterContextPath.startsWith("/")) {
this.filterContextPath='/'+this.filterContextPath;
                }
if (this.filterContextPath.endsWith("/")) {
this.filterContextPath=this.filterContextPath.substring(0, this.filterContextPath.length() -1);
                }
            }
        }
    }
    ...省略...
}

2.Eureka上下文初始化

紧接着EureakServerContext的initialize方法被调用,该方法有 @PostConstruct注解决定了它是初始化方法

@SingletonpublicclassDefaultEurekaServerContextimplementsEurekaServerContext {
@PostConstruct@Overridepublicvoidinitialize() {
logger.info("Initializing ...");
//开始集群节点更新peerEurekaNodes.start();
try {
//服务注册器初始化registry.init(peerEurekaNodes);
        } catch (Exceptione) {
thrownewRuntimeException(e);
        }
logger.info("Initialized");
    }

这里做了两个事情

  • 调用 peerEurekaNodes.start(); 定时更新Eureka集群中的节点
  • 调用服务注册器PeerAwareInstanceRegistryImpl的初始化init

3.启动PeerEurekaNodes集群节点更新

PeerEurekaNodes.start被调用,这里通过定时器定时更新Eureka集群节点,默认10m/次

@SingletonpublicclassPeerEurekaNodes {
//开始publicvoidstart() {
//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器taskExecutor=Executors.newSingleThreadScheduledExecutor(
newThreadFactory() {
@OverridepublicThreadnewThread(Runnabler) {
Threadthread=newThread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
returnthread;
                    }
                }
        );
try {
//更新集群中的节点中的注册信息updatePeerEurekaNodes(resolvePeerUrls());
//创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());RunnablepeersUpdateTask=newRunnable() {
@Overridepublicvoidrun() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwablee) {
logger.error("Cannot update the replica Nodes", e);
                    }
                }
            };
//定时任务taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
//定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTESserverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS            );
        } catch (Exceptione) {
thrownewIllegalStateException(e);
        }
for (PeerEurekaNodenode : peerEurekaNodes) {
logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }
}

定时调用updatePeerEurekaNodes方法更新集群,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown做关闭操作,新的节点调用createPeerEurekaNode进行创建,集群节点最终存储在List结构中

4.服务注册器初始化

DefaultEurekaServerContext 中调用完peerEurekaNodes.start();方法后调用PeerAwareInstanceRegistryImpl.init方法进行注册器的初始化

//初始化方法@Overridepublicvoidinit(PeerEurekaNodespeerEurekaNodes) throwsException {
//最后一分钟的复制次数定时器Timer开始this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes=peerEurekaNodes;
//初始化 ResponseCache(ResponseCacheImpl) ,负责缓存客户端查询的注册表信息initializedResponseCache();
//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新scheduleRenewalThresholdUpdateTask();
//初始化远程注册表,默认么有远程RegioninitRemoteRegionRegistry();
try {
//注册到对象监视器Monitors.registerObject(this);
    } catch (Throwablee) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}

这里我们主要分析两个东西

  • initializedResponseCache 初始化注册表响应缓存
  • scheduleRenewalThresholdUpdateTask 定时更新续约阈值

initializedResponseCache初始化响应缓存

注意:这里有这么一句代码initializedResponseCache,它初始化了一个ResponseCache 响应缓存,ResponseCacheImpl是具体实现,该类中构造了一个readWriteCacheMap读写缓存的Map,和一个只读缓存readOnlyCacheMap的Map。为什么是响应缓存,以为客户端在获取服务注册表的时候就会从readOnlyCacheMap缓存中去获取

publicclassResponseCacheImplimplementsResponseCache {
    ...省略...
//只读缓存privatefinalConcurrentMap<Key, Value>readOnlyCacheMap=newConcurrentHashMap<Key, Value>();
//读写缓存privatefinalLoadingCache<Key, Value>readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfigserverConfig, ServerCodecsserverCodecs, AbstractInstanceRegistryregistry) {
this.serverConfig=serverConfig;
this.serverCodecs=serverCodecs;
//获取配置,是否是只读缓存,默认true,拉取注册表的时候还会从只读缓存拉取this.shouldUseReadOnlyResponseCache=serverConfig.shouldUseReadOnlyResponseCache();
this.registry=registry;
//获取响应缓存更新时间间隔 30slongresponseCacheUpdateIntervalMs=serverConfig.getResponseCacheUpdateIntervalMs();
//构建一个 readWriteCacheMap this.readWriteCacheMap=CacheBuilder.newBuilder().initialCapacity(1000)
            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
            .removalListener(newRemovalListener<Key, Value>() {
@OverridepublicvoidonRemoval(RemovalNotification<Key, Value>notification) {
KeyremovedKey=notification.getKey();
if (removedKey.hasRegions()) {
KeycloneWithNoRegions=removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                    }
                }
            })
            .build(newCacheLoader<Key, Value>() {
@OverridepublicValueload(Keykey) throwsException {
if (key.hasRegions()) {
KeycloneWithNoRegions=key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
                    }
Valuevalue=generatePayload(key);
returnvalue;
                }
            });
//如果使用只读响应缓存,if (shouldUseReadOnlyResponseCache) {
//每隔responseCacheUpdateIntervalMs=30s执行getCacheUpdateTasktimer.schedule(getCacheUpdateTask(),
newDate(((System.currentTimeMillis() /responseCacheUpdateIntervalMs) *responseCacheUpdateIntervalMs)
+responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
        }
try {
Monitors.registerObject(this);
        } catch (Throwablee) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    }
privateTimerTaskgetCacheUpdateTask() {
returnnewTimerTask() {
@Overridepublicvoidrun() {
//如果数据不一致,从readWriteCacheMap缓存更新readOnlyCacheMap缓存logger.debug("Updating the client cache from response cache");
for (Keykey : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                    }
try {
CurrentRequestVersion.set(key.getVersion());
ValuecacheValue=readWriteCacheMap.get(key);
ValuecurrentCacheValue=readOnlyCacheMap.get(key);
if (cacheValue!=currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwableth) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                    }
                }
            }
        };
    }
}

scheduleRenewalThresholdUpdateTask 定时更新续约阈值

定时任务每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值

/**每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值* Schedule the task that updates <em>renewal threshold</em> periodically.* The renewal threshold would be used to determine if the renewals drop* dramatically because of network partition and to protect expiring too* many instances at a time.* */privatevoidscheduleRenewalThresholdUpdateTask() {
//定时任务timer.schedule(newTimerTask() {
@Overridepublicvoidrun() {
//更新续约阈值updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());    //900s}

updateRenewalThreshold是具体的更新逻辑

// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()/*** Updates the <em>renewal threshold</em> based on the current number of* renewals. The threshold is a percentage as specified in* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals* received per minute {@link #getNumOfRenewsInLastMin()}.*/privatevoidupdateRenewalThreshold() {
try {
//获取到注册表Applicationsapps=eurekaClient.getApplications();
intcount=0;
// 计算有多少个注册的服务实例for (Applicationapp : apps.getRegisteredApplications()) {
for (InstanceInfoinstance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
                }
            }
        }
//枷锁synchronized (lock) {
// Update threshold only if the threshold is greater than the// current expected threshold of if the self preservation is disabled.// 只有当阀值大于当前预期值时或者关闭了自我保护模式才更新  if ((count*2) > (serverConfig.getRenewalPercentThreshold() *numberOfRenewsPerMinThreshold)
|| (!this.isSelfPreservationModeEnabled())) {
//判断如果阈值时候大于预期的阈值 或者 关闭了我保护//更新每分钟的预期续订次数:服务数 * 2 ,每个客户端30s/次,1分钟2次this.expectedNumberOfRenewsPerMin=count*2;
//更新每分钟阈值的续订次数 :服务数 * 2 * 0.85 (百分比阈值) this.numberOfRenewsPerMinThreshold= (int) ((count*2) *serverConfig.getRenewalPercentThreshold());
            }
        }
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwablee) {
logger.error("Cannot update renewal threshold", e);
    }
}

当关闭自我保护,或者当前阈值大于预期阈值,就会更新续约的阈值,那么这是怎么样的一个更新算法呢?

  • 每分钟的预期续订次数 = 服务数 * 2 ,因为: 一个服务30s/一次续约
  • 每分钟阈值 = 服务数 * 2 * 0.85

5.EurekaServer初始化配置

EurekaServerInitializerConfiguration 的start方法会在Spring容器刷新的时候调用,因为它实现了SmartLifecycle接口 , start方法中新开线程调用eurekaServerBootstrap.contextInitialized进行初始化

publicvoidstart() {
newThread(newRunnable() {
@Overridepublicvoidrun() {
try {
//TODO: is this class even needed now?eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(newEurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running=true;
publish(newEurekaServerStartedEvent(getEurekaServerConfig()));
            }
catch (Exceptionex) {
// Help!log.error("Could not initialize Eureka servlet context", ex);
            }
        }
    }).start();
}

6.Eureka启动引导

EurekaServerBootstrap .contextInitialized 负责初始化Eureak环境和初始化上下文

//Eureka初始化publicvoidcontextInitialized(ServletContextcontext) {
try {
//初始化环境initEurekaEnvironment();
//初始化上下文initEurekaServerContext();
//设置上下文属性context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
catch (Throwablee) {
log.error("Cannot bootstrap eureka server :", e);
thrownewRuntimeException("Cannot bootstrap eureka server :", e);
    }
}

在初始化上下文的时候会调用 PeerAwareInstanceRegistryImpl.syncUp(); 从相邻的集群节点同步注册表,通过PeerAwareInstanceRegistryImpl.register注册到当前Eureka节点

//初始化eurekaServer上下文protectedvoidinitEurekaServerContext() throwsException {
    ...省略...
//把EurekaServerContext设置到EurekaServerContextHolder中EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现intregistryCount=this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.//注册所有监视统计信息。EurekaMonitors.registerAllStats();
}

同步相邻节点的注册表PeerAwareInstanceRegistryImpl.syncUp()

@OverridepublicintsyncUp() {
// Copy entire entry from neighboring DS nodeintcount=0;
//getRegistrySyncRetries重试次数默认5次for (inti=0; ((i<serverConfig.getRegistrySyncRetries()) && (count==0)); i++) {
if (i>0) {
try {
//通信中断,等待下一次切换实例Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedExceptione) {
logger.warn("Interrupted during registry transfer..");
break;
            }
        }
//获取注册表Applicationsapps=eurekaClient.getApplications();
//循环服务列表,依次注册for (Applicationapp : apps.getRegisteredApplications()) {
for (InstanceInfoinstance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry 中缓存起来register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
                    }
                } catch (Throwablet) {
logger.error("During DS init copy", t);
                }
            }
        }
    }
returncount;
}

到这里EurekaServer就算初始化完成了

总结

目录
相关文章
|
7天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
23 2
|
23天前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
13天前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
39 9
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
111 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
Java Spring 容器
Spring IOC、AOP与事务管理底层原理及源码解析
【10月更文挑战第1天】Spring框架以其强大的控制反转(IOC)和面向切面编程(AOP)功能,成为Java企业级开发中的首选框架。本文将深入探讨Spring IOC和AOP的底层原理,并通过源码解析来揭示其实现机制。同时,我们还将探讨Spring事务管理的核心原理,并给出相应的源码示例。
128 9
|
1月前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
1月前
|
缓存 Java Spring
源码解读:Spring如何解决构造器注入的循环依赖?
本文详细探讨了Spring框架中的循环依赖问题,包括构造器注入和字段注入两种情况,并重点分析了构造器注入循环依赖的解决方案。文章通过具体示例展示了循环依赖的错误信息及常见场景,提出了三种解决方法:重构代码、使用字段依赖注入以及使用`@Lazy`注解。其中,`@Lazy`注解通过延迟初始化和动态代理机制有效解决了循环依赖问题。作者建议优先使用`@Lazy`注解,并提供了详细的源码解析和调试截图,帮助读者深入理解其实现机制。
35 1
|
1月前
|
存储 开发框架 Java
什么是Spring?什么是IOC?什么是DI?IOC和DI的关系? —— 零基础可无压力学习,带源码
文章详细介绍了Spring、IOC、DI的概念和关系,解释了控制反转(IOC)和依赖注入(DI)的原理,并提供了IOC的代码示例,阐述了Spring框架作为IOC容器的应用。
31 0
什么是Spring?什么是IOC?什么是DI?IOC和DI的关系? —— 零基础可无压力学习,带源码
|
2月前
|
缓存 Java Spring
手写Spring Ioc 循环依赖底层源码剖析
在Spring框架中,IoC(控制反转)是一个核心特性,它通过依赖注入(DI)实现了对象间的解耦。然而,在实际开发中,循环依赖是一个常见的问题。
40 4
下一篇
无影云桌面