前言
上一章我们分析了一下EureakServer的自动配置,这章节我们来详细分析一下Eureak Server中的核心组件以及初始化流程
一.Eureka Server 核心组件介绍
1.EurekaServerContext
Eureka服务端上下文对象,包含了初始化,关闭,获取服务配置,获取集群节点,获取服务注册器,获取服务信息管理器等方法,默认实现类是DefaultEurekaServerContext
publicinterfaceEurekaServerContext { //初始化voidinitialize() throwsException; //关闭voidshutdown() throwsException; //获取服务配置EurekaServerConfiggetServerConfig(); //获取集群节点管理管理类PeerEurekaNodesgetPeerEurekaNodes(); //服务器编解码器ServerCodecsgetServerCodecs(); //服务注册器PeerAwareInstanceRegistrygetRegistry(); //instanceInfo实例信息管理器ApplicationInfoManagergetApplicationInfoManager(); }
DefaultEurekaServerContext实现类代码
/*** Represent the local server context and exposes getters to components of the* local server such as the registry.** @author David Liu*/publicclassDefaultEurekaServerContextimplementsEurekaServerContext { privatestaticfinalLoggerlogger=LoggerFactory.getLogger(DefaultEurekaServerContext.class); privatefinalEurekaServerConfigserverConfig; privatefinalServerCodecsserverCodecs; privatefinalPeerAwareInstanceRegistryregistry; privatefinalPeerEurekaNodespeerEurekaNodes; privatefinalApplicationInfoManagerapplicationInfoManager; publicDefaultEurekaServerContext(EurekaServerConfigserverConfig, ServerCodecsserverCodecs, PeerAwareInstanceRegistryregistry, PeerEurekaNodespeerEurekaNodes, ApplicationInfoManagerapplicationInfoManager) { this.serverConfig=serverConfig; this.serverCodecs=serverCodecs; this.registry=registry; this.peerEurekaNodes=peerEurekaNodes; this.applicationInfoManager=applicationInfoManager; } // @PostConstruct :EurekaServerContext初始化的时候initialize方法被执行,调用 peerEurekaNodes.start();开启EurekaServer的初始化,//然后再调用 peerAwareInstanceRegistry的.init(peerEurekaNodes);方法初始化publicvoidinitialize() { logger.info("Initializing ..."); //PeerEurekaNodes开始初始化peerEurekaNodes.start(); try { //peerAwareInstanceRegistry开始初始化registry.init(peerEurekaNodes); } catch (Exceptione) { thrownewRuntimeException(e); } logger.info("Initialized"); } //EurekaServerContext销毁之前(@PreDestroy)调用shutdown,//peerAwareInstanceRegistry 注册器的shutdown执行关闭流程publicvoidshutdown() { logger.info("Shutting down ..."); //服务注册器关闭registry.shutdown(); //peerEurekaNodes集群节点关闭peerEurekaNodes.shutdown(); logger.info("Shut down"); } ...省略... }
DefaultEurekaServerContext
的initialize初始化方法中做的事情就是在初始化的时候,调用peerEurekaNodes.start();
初始化集群节点, 调用PeerAwareInstanceRegistry.init
初始化注册器,在shutdown
销毁方法中调用PeerAwareInstanceRegistry.shudown
执行注册器的关闭流程,调用peerEurekaNodes.shutdown
执行集群节点的关闭
2.PeerEurekaNodes
PeerEurekaNodes
用来管理Eureka集群节点PeerEurekaNode
生命周期的工具被DefaultEurekaServerContext
的initialize初始化方法中执行,源码如下
/*** Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.** @author Tomasz Bak*/publicclassPeerEurekaNodes { privatestaticfinalLoggerlogger=LoggerFactory.getLogger(PeerEurekaNodes.class); //服务注册接口protectedfinalPeerAwareInstanceRegistryregistry; //服务端配置对象protectedfinalEurekaServerConfigserverConfig; //客户端配置protectedfinalEurekaClientConfigclientConfig; protectedfinalServerCodecsserverCodecs; //InstanceInfo实例管理器privatefinalApplicationInfoManagerapplicationInfoManager; //Eureka集群节点集合privatevolatileList<PeerEurekaNode>peerEurekaNodes=Collections.emptyList(); //Eureka集群节点的url集合privatevolatileSet<String>peerEurekaNodeUrls=Collections.emptySet(); //定时任务执行器privateScheduledExecutorServicetaskExecutor; //初始化节点工具publicPeerEurekaNodes( PeerAwareInstanceRegistryregistry, EurekaServerConfigserverConfig, EurekaClientConfigclientConfig, ServerCodecsserverCodecs, ApplicationInfoManagerapplicationInfoManager) { this.registry=registry; this.serverConfig=serverConfig; this.clientConfig=clientConfig; this.serverCodecs=serverCodecs; this.applicationInfoManager=applicationInfoManager; } //获取集群节点集合,不可修改publicList<PeerEurekaNode>getPeerNodesView() { returnCollections.unmodifiableList(peerEurekaNodes); } //获取集群节点集合publicList<PeerEurekaNode>getPeerEurekaNodes() { returnpeerEurekaNodes; } //此实例提供对等复制实例的最小数量,被认为是健康的publicintgetMinNumberOfAvailablePeers() { returnserverConfig.getHealthStatusMinNumberOfAvailablePeers(); } //开始publicvoidstart() { //创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器taskExecutor=Executors.newSingleThreadScheduledExecutor( newThreadFactory() { publicThreadnewThread(Runnabler) { Threadthread=newThread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); returnthread; } } ); try { //更新集群中的节点中的注册信息updatePeerEurekaNodes(resolvePeerUrls()); //创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());RunnablepeersUpdateTask=newRunnable() { publicvoidrun() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwablee) { logger.error("Cannot update the replica Nodes", e); } } }; //taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), //定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTESserverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exceptione) { thrownewIllegalStateException(e); } for (PeerEurekaNodenode : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } } //关闭,关闭节点更新的定时任务,清空peerEurekaNodes ,peerEurekaNodeUrls ,调用每个节点的shutDown方法publicvoidshutdown() { taskExecutor.shutdown(); List<PeerEurekaNode>toRemove=this.peerEurekaNodes; this.peerEurekaNodes=Collections.emptyList(); this.peerEurekaNodeUrls=Collections.emptySet(); for (PeerEurekaNodenode : toRemove) { node.shutDown(); } } /**基于相同的Zone得到Eureka集群中多个节点的url,过滤掉当前节点* Resolve peer URLs.** @return peer URLs with node's own URL filtered out*/protectedList<String>resolvePeerUrls() { InstanceInfomyInfo=applicationInfoManager.getInfo(); Stringzone=InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); //配置的eureka地址urlList<String>replicaUrls=EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, newEndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); intidx=0; while (idx<replicaUrls.size()) { //移除当前eureka节点的urlif (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } returnreplicaUrls; } /**更新集群节点列表的方法,在定时器中被执行,newPeerUrls是集群中的eureka server节点的url,过滤了本地节点的url做法是删除老的不可用的节点调用shutDown方法,使用createPeerEurekaNode创建新的节点添加新的节点* Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and* create new ones.** @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out*///修改集群节点protectedvoidupdatePeerEurekaNodes(List<String>newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } //需要关闭的节点Set<String>toShutdown=newHashSet<>(peerEurekaNodeUrls); //移除掉新的节点,新的节点不需要关闭toShutdown.removeAll(newPeerUrls); //新的节点需要添加Set<String>toAdd=newHashSet<>(newPeerUrls); //新的节点中移除老的节点toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() &&toAdd.isEmpty()) { // No changereturn; } // Remove peers no long available 移除不可用的节点//节点集合,本地缓存的所有节点List<PeerEurekaNode>newNodeList=newArrayList<>(peerEurekaNodes); //如果需要关闭的节点集合不为空if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); inti=0; while (i<newNodeList.size()) { PeerEurekaNodeeurekaNode=newNodeList.get(i); //如果当前节点需要关闭,包含在toShutdown中if (toShutdown.contains(eurekaNode.getServiceUrl())) { //从newNodeList中移除掉newNodeList.remove(i); //执行节点的关闭方法eurekaNode.shutDown(); } else { i++; } } } // Add new peers 如果需要添加新的节点if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (StringpeerUrl : toAdd) { //调用 createPeerEurekaNode 创建新的节点,添加到节点集合中newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes=newNodeList; this.peerEurekaNodeUrls=newHashSet<>(newPeerUrls); } //创建集群节点PeerEurekaNode protectedPeerEurekaNodecreatePeerEurekaNode(StringpeerEurekaNodeUrl) { HttpReplicationClientreplicationClient=JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl); StringtargetHost=hostFromUrl(peerEurekaNodeUrl); if (targetHost==null) { targetHost="host"; } returnnewPeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig); } ...省略...
PeerEurekaNodes
主要定义了eureka
集群节点更新逻辑,通过定时任务定时更新,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown
做关闭操作,新的节点调用createPeerEurekaNode
进行创建,集群节点最终存储在List<PeerEurekaNode>
结构中
3.PeerAwareInstanceRegistry
PeerAwareInstanceRegistry
翻译为“对等感知实例注册表” ,其实就是服务注册器,只是这个注册器会考虑集群中的其它节点的数据同步,
publicPeerAwareInstanceRegistrypeerAwareInstanceRegistry( ServerCodecsserverCodecs) { this.eurekaClient.getApplications(); // force initializationreturnnewInstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); }
看一下继承关系:
- LookupService:服务查找接口,提供了服务的方法,在之前有介绍过,在客户端EurekaClient继承该接口,在服务端InstanceRegistry继承了该接口
publicinterfaceLookupService<T> { //获取应用ApplicationgetApplication(Stringvar1); //获取应用注册表ApplicationsgetApplications(); //获取实例信息列表List<InstanceInfo>getInstancesById(Stringvar1); //获取下一台服务实例信息InstanceInfogetNextServerFromEureka(Stringvar1, booleanvar2); }
- LeaseManager:租约管理器接口,提供了register注册,cancel取消,renew续约,evict过期等服务相关的操作
publicinterfaceLeaseManager<T> { //注册voidregister(Tr, intleaseDuration, booleanisReplication); //取消,下线booleancancel(StringappName, Stringid, booleanisReplication); //续约booleanrenew(StringappName, Stringid, booleanisReplication); //过期voidevict(); }
- InstanceRegistry:应用实例注册表接口继承了 LookupService 、LeaseManager ,提供应用实例的注册与发现服务,
publicinterfaceInstanceRegistryextendsLeaseManager<InstanceInfo>, LookupService<String> { //允许开始传输数据voidopenForTraffic(ApplicationInfoManagerapplicationInfoManager, intcount); //关闭voidshutdown(); voidstoreOverriddenStatusIfRequired(Stringid, InstanceStatusoverriddenStatus); //存储实例 覆盖状态:使用的是InstanceInfo.InstanceStatus overriddenStatus 覆盖状态,//使用该状态来修改注册中心服务的注册状态· ·voidstoreOverriddenStatusIfRequired(StringappName, Stringid, InstanceStatusoverriddenStatus); //更新服务注册状态booleanstatusUpdate(StringappName, Stringid, InstanceStatusnewStatus, StringlastDirtyTimestamp, booleanisReplication); //删除覆盖的状态booleandeleteStatusOverride(StringappName, Stringid, InstanceStatusnewStatus, StringlastDirtyTimestamp, booleanisReplication); //服务状态快照Map<String, InstanceStatus>overriddenInstanceStatusesSnapshot(); //获取本地服务注册表,从本地ConcurrentHashMap缓存的服务注册表中获取ApplicationsgetApplicationsFromLocalRegionOnly(); //获取服务注册表List<Application>getSortedApplications(); //根据名字获取服务ApplicationgetApplication(StringappName, booleanincludeRemoteRegion); //根据名字和id获取实例信息InstanceInfogetInstanceByAppAndId(StringappName, Stringid); //根据名字和id获取实例信息InstanceInfogetInstanceByAppAndId(StringappName, Stringid, booleanincludeRemoteRegions); //完全清除注册表//overriddenInstanceStatusMap.clear(); 覆盖状态清除//recentCanceledQueue.clear(); 最近取消队列//recentRegisteredQueue.clear(); 最近注册队列//recentlyChangedQueue.clear(); 最近更改队列//registry.clear(); 清除注册表voidclearRegistry(); //初始化的响应缓存voidinitializedResponseCache(); //获取响应缓存ResponseCachegetResponseCache(); //最后一分钟续约次数,用作自我保护计算值longgetNumOfRenewsInLastMin(); //获取每分钟续约次数,用作自我保护计算值intgetNumOfRenewsPerMinThreshold(); //检查续订次数是否小于阈值。intisBelowRenewThresold(); //最近注册的实例List<Pair<Long, String>>getLastNRegisteredInstances(); //最近取消的实例List<Pair<Long, String>>getLastNCanceledInstances(); //最近过期的实例booleanisLeaseExpirationEnabled(); //是否开启自我保护booleanisSelfPreservationModeEnabled(); }
- AbstractInstanceRegistry:InstanceRegistry的实现类,应用对象注册表抽象,处理客户端的注册请求,包括 register注册,Renewals续约,Cancels下线,Expirations过期,Status Changes状态改变,服务注册表以增量的方式增加
- PeerAwareInstanceRegistry: InstanceRegistry的子接口,应用对象注册表接口,实现了 Eureka-Server 集群内注册信息同步功能
publicinterfacePeerAwareInstanceRegistryextendsInstanceRegistry { //初始化PeerEurekaNodes 集群节点voidinit(PeerEurekaNodespeerEurekaNodes) throwsException; //注册表信息同步, 如果节点之间通信失败,列表中耗尽该操作故障转移到其他节点intsyncUp(); //检查是否有访问权限booleanshouldAllowAccess(booleanremoteRegionRequired); //注册InstanceInfo到其他Eureka节点voidregister(InstanceInfoinfo, booleanisReplication); //修改状态voidstatusUpdate(finalStringasgName, finalASGResource.ASGStatusnewStatus, finalbooleanisReplication); }
- PeerAwareInstanceRegistryImpl:PeerAwareInstanceRegistry的子类,,应用对象注册的具体实现,同时继承了AbstractInstanceRegistry
- InstanceRegistry :PeerAwareInstanceRegistryImpl的子类,
有些实现类没拉开看,后面会详细分析
4.PeerAwareInstanceRegistryImpl
服务注册器,继承AbstractInstanceRegistry抽象类, 实现 PeerAwareInstanceRegistry服务注册接口,包含了服务注册,续约,下线,过期,状态改变等等功能。
/**集群之间节点同步的服务注册器, 所有操作都在其父类 AbstractInstanceRegistry 中,* Handles replication of all operations to {@link AbstractInstanceRegistry} to peer* <em>Eureka</em> nodes to keep them all in sync.** <p>* 主要操作是副本的注册,续约,取消,到期和状态更改* Primary operations that are replicated are the* <em>Registers,Renewals,Cancels,Expirations and Status Changes</em>* </p>** <p>* 当eureka服务器启动时,它将尝试从对等的eureka节点获取所有注册表信息,如果由于某种原因该操作失败,* 则服务器将不允许用户在指定的时间段内获取注册表信息。* When the eureka server starts up it tries to fetch all the registry* information from the peer eureka nodes.If for some reason this operation* fails, the server does not allow the user to get the registry information for* a period specified in* {@link com.netflix.eureka.EurekaServerConfig#getWaitTimeInMsWhenSyncEmpty()}.* </p>** <p>* *关于续约的重要注意事项。如果续约失败次数超过EurekaServerConfig.getRenewalPercentThreshold()中指定的指定阈值,则在EurekaServerConfig#getRenewalThresholdUpdateIntervalMs()时间内,eureka将其视为危险,并停止实例过期* * One important thing to note about <em>renewals</em>.If the renewal drops more* than the specified threshold as specified in* {@link com.netflix.eureka.EurekaServerConfig#getRenewalPercentThreshold()} within a period of* {@link com.netflix.eureka.EurekaServerConfig#getRenewalThresholdUpdateIntervalMs()}, eureka* perceives this as a danger and stops expiring instances.* </p>** @author Karthik Ranganathan, Greg Kim**/publicclassPeerAwareInstanceRegistryImplextendsAbstractInstanceRegistryimplementsPeerAwareInstanceRegistry { privatestaticfinalLoggerlogger=LoggerFactory.getLogger(PeerAwareInstanceRegistryImpl.class); privatestaticfinalStringUS_EAST_1="us-east-1"; privatestaticfinalintPRIME_PEER_NODES_RETRY_MS=30000; privatelongstartupTime=0; privatebooleanpeerInstancesTransferEmptyOnStartup=true; //把功能抽成枚举,心跳检查,注册,取消注册,状态改变,删除覆盖状态publicenumAction { Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride; privatecom.netflix.servo.monitor.Timertimer=Monitors.newTimer(this.name()); publiccom.netflix.servo.monitor.TimergetTimer() { returnthis.timer; } } privatestaticfinalComparator<Application>APP_COMPARATOR=newComparator<Application>() { publicintcompare(Applicationl, Applicationr) { returnl.getName().compareTo(r.getName()); } }; privatefinalMeasuredRatenumberOfReplicationsLastMin; //客户端protectedfinalEurekaClienteurekaClient; //集群节点管理protectedvolatilePeerEurekaNodespeerEurekaNodes; privatefinalInstanceStatusOverrideRuleinstanceStatusOverrideRule; privateTimertimer=newTimer( "ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true); publicPeerAwareInstanceRegistryImpl( EurekaServerConfigserverConfig, EurekaClientConfigclientConfig, ServerCodecsserverCodecs, EurekaClienteurekaClient ) { super(serverConfig, clientConfig, serverCodecs); this.eurekaClient=eurekaClient; //最后一分钟的复制次数this.numberOfReplicationsLastMin=newMeasuredRate(1000*60*1); // We first check if the instance is STARTING or DOWN, then we check explicit overrides,// then we check the status of a potentially existing lease.this.instanceStatusOverrideRule=newFirstMatchWinsCompositeRule(newDownOrStartingRule(), newOverrideExistsRule(overriddenInstanceStatusMap), newLeaseExistsRule()); } protectedInstanceStatusOverrideRulegetInstanceInfoOverrideRule() { returnthis.instanceStatusOverrideRule; } //初始化方法publicvoidinit(PeerEurekaNodespeerEurekaNodes) throwsException { //最后一分钟的复制次数定时器Timer开始this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes=peerEurekaNodes; //初始化 ResponseCache ,负责缓存客户端查询的注册表信息 30s/1次initializedResponseCache(); //续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新scheduleRenewalThresholdUpdateTask(); //初始化远程注册表,默认么有远程RegioninitRemoteRegionRegistry(); try { //注册到对象监视器Monitors.registerObject(this); } catch (Throwablee) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } } /**执行所有清理和关闭操作。* Perform all cleanup and shutdown operations.*/publicvoidshutdown() { try { //注销对象监视DefaultMonitorRegistry.getInstance().unregister(Monitors.newObjectMonitor(this)); } catch (Throwablet) { logger.error("Cannot shutdown monitor registry", t); } try { //集群节点关闭peerEurekaNodes.shutdown(); } catch (Throwablet) { logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", t); } //最后一分钟的复制次数定时器 Timer停止numberOfReplicationsLastMin.stop(); //执行所有清理和关闭操作。//deltaRetentionTimer.cancel(); 增量保留计时器//evictionTimer.cancel(); 服务剔除计时器//renewsLastMin.stop(); 最后一分钟的复制次数机器停止super.shutdown(); } //续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新privatevoidscheduleRenewalThresholdUpdateTask() { timer.schedule(newTimerTask() { publicvoidrun() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); } /**集群数据同步,从集群中eureka节点复制注册表信息。如果通信失败,此操作将故障转移到其他节点,直到列表用尽。* Populates the registry information from a peer eureka node. This* operation fails over to other nodes until the list is exhausted if the* communication fails.*/publicintsyncUp() { // Copy entire entry from neighboring DS nodeintcount=0; //getRegistrySyncRetries重试次数默认5次for (inti=0; ((i<serverConfig.getRegistrySyncRetries()) && (count==0)); i++) { if (i>0) { try { //通信中断,等待下一次切换实例Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedExceptione) { logger.warn("Interrupted during registry transfer.."); break; } } //获取注册表Applicationsapps=eurekaClient.getApplications(); //循环服务列表,依次注册for (Applicationapp : apps.getRegisteredApplications()) { for (InstanceInfoinstance : app.getInstances()) { try { if (isRegisterable(instance)) { //获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry 中缓存起来register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwablet) { logger.error("During DS init copy", t); } } } } returncount; } //运行开始传输数据publicvoidopenForTraffic(ApplicationInfoManagerapplicationInfoManager, intcount) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2.//每分钟的预期续订次数 2次,30s/一次续约this.expectedNumberOfRenewsPerMin=count*2; //每分钟续约次数阈值 = expectedNumberOfRenewsPerMin每分钟续约次数 * 85%//如果客户端续约低于这个阈值,将会开启服务端的自我保护功能this.numberOfRenewsPerMinThreshold= (int) (this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold()); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime=System.currentTimeMillis(); if (count>0) { this.peerInstancesTransferEmptyOnStartup=false; } DataCenterInfo.NameselfName=applicationInfoManager.getInfo().getDataCenterInfo().getName(); booleanisAws=Name.Amazon==selfName; if (isAws&&serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } //改变服务的状态为UPlogger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); //这里使用定时任务开启新的 服务剔除任务super.postInit(); } //取消注册,服务下线publicbooleancancel(finalStringappName, finalStringid, finalbooleanisReplication) { //调用父类的下线方法if (super.cancel(appName, id, isReplication)) { replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); synchronized (lock) { if (this.expectedNumberOfRenewsPerMin>0) { // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)//客户下线,降低续约阈值this.expectedNumberOfRenewsPerMin=this.expectedNumberOfRenewsPerMin-2; this.numberOfRenewsPerMinThreshold= (int) (this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold()); } } returntrue; } returnfalse; } //服务注册publicvoidregister(finalInstanceInfoinfo, finalbooleanisReplication) { intleaseDuration=Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() !=null&&info.getLeaseInfo().getDurationInSecs() >0) { leaseDuration=info.getLeaseInfo().getDurationInSecs(); } //调用父类的注册super.register(info, leaseDuration, isReplication); //注册信息同步到集群中其他节点replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } //续约publicbooleanrenew(finalStringappName, finalStringid, finalbooleanisReplication) { //调用父类的续约if (super.renew(appName, id, isReplication)) { //同步到集群中的其他节点replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); returntrue; } returnfalse; } //修改服务状态publicbooleanstatusUpdate(finalStringappName, finalStringid, finalInstanceStatusnewStatus, StringlastDirtyTimestamp, finalbooleanisReplication) { if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) { //状态同步到其他节点replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication); returntrue; } returnfalse; } //删除状态publicbooleandeleteStatusOverride(StringappName, Stringid, InstanceStatusnewStatus, StringlastDirtyTimestamp, booleanisReplication) { if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) { replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication); returntrue; } returnfalse; } //是否启用租约到期publicbooleanisLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire.returntrue; } returnnumberOfRenewsPerMinThreshold>0&&getNumOfRenewsInLastMin() >numberOfRenewsPerMinThreshold; } //更新续约阈值privatevoidupdateRenewalThreshold() { try { Applicationsapps=eurekaClient.getApplications(); //统计有多少个实例intcount=0; for (Applicationapp : apps.getRegisteredApplications()) { for (InstanceInfoinstance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { //仅当阈值大于当前的预期阈值,或禁用了自我保留时才更新阈值。// Update threshold only if the threshold is greater than the// current expected threshold or if self preservation is disabled.if ((count*2) > (serverConfig.getRenewalPercentThreshold() *expectedNumberOfRenewsPerMin) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfRenewsPerMin=count*2; this.numberOfRenewsPerMinThreshold= (int) ((count*2) *serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwablee) { logger.error("Cannot update renewal threshold", e); } } /**集群之间的节点复制* Replicates all eureka actions to peer eureka nodes except for replication* traffic to this node.**/privatevoidreplicateToPeers(Actionaction, StringappName, Stringid, InstanceInfoinfo/* optional */, InstanceStatusnewStatus/* optional */, booleanisReplication) { Stopwatchtracer=action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes==Collections.EMPTY_LIST||isReplication) { return; } for (finalPeerEurekaNodenode : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } /**集群之间的节点复制* Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/privatevoidreplicateInstanceActionsToPeers(Actionaction, StringappName, Stringid, InstanceInfoinfo, InstanceStatusnewStatus, PeerEurekaNodenode) { try { InstanceInfoinfoFromRegistry=null; CurrentRequestVersion.set(Version.V2); switch (action) { caseCancel: node.cancel(appName, id); break; caseHeartbeat: InstanceStatusoverriddenStatus=overriddenInstanceStatusMap.get(id); infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; caseRegister: node.register(info); break; caseStatusUpdate: infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; caseDeleteStatusOverride: infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwablet) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } } ....省略一些代码.... }
这个服务注册器实现类看起来很复杂它做了那些事情呢
init初始化
:注册表缓存ResponseCache初始化,续约阈值定时更新任务初始化,初始化远程注册表showdown
:执行所有清理和关闭操作syncUp
:集群之间的数据同步节点复制cancel
:服务下线,并同步到其他节点register
:服务注册,并同步到其他节点renew
: 续约,并同步到其他节点
5.EurekaServerInitializerConfiguration
EurekaServerAutoConfiguration
通过 @Import(EurekaServerInitializerConfiguration.class)
进行初始化,EurekaServerInitializerConfiguration
实现了SmartLifecycle
,其中的start
方法会再Spring启动过程中,执行LifecycleProcessor().onRefresh()
生命周期处理器刷新的时候被调用,然后再调用EurekaServerBootstrap.contextInitialized
进行初始化Eureka和启动Eureka
/*** @author Dave Syer*/publicclassEurekaServerInitializerConfigurationimplementsServletContextAware, SmartLifecycle, Ordered { privatestaticfinalLoglog=LogFactory.getLog(EurekaServerInitializerConfiguration.class); //EurekaServer 配置privateEurekaServerConfigeurekaServerConfig; //Servlet上下文privateServletContextservletContext; //应用上下文对象privateApplicationContextapplicationContext; //启动引导privateEurekaServerBootstrapeurekaServerBootstrap; privatebooleanrunning; privateintorder=1; //初始化Servlet上下文publicvoidsetServletContext(ServletContextservletContext) { this.servletContext=servletContext; } //开始方法,复写于 SmartLifecycle 在Spring启动的时候,该方法会被地调用,publicvoidstart() { newThread(newRunnable() { publicvoidrun() { try { //TODO: is this class even needed now?//初始化EurekaServer上下文,启动EurekaServereurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); //发布一个EurekaRegistryAvailableEvent注册事件publish(newEurekaRegistryAvailableEvent(getEurekaServerConfig())); //改变running状态trueEurekaServerInitializerConfiguration.this.running=true; //发布EurekaServer启动事件EurekaServerStartedEventpublish(newEurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exceptionex) { // Help!log.error("Could not initialize Eureka servlet context", ex); } } }).start(); } privateEurekaServerConfiggetEurekaServerConfig() { returnthis.eurekaServerConfig; } privatevoidpublish(ApplicationEventevent) { this.applicationContext.publishEvent(event); } //生命周期,停止,销毁eurekaServerpublicvoidstop() { this.running=false; eurekaServerBootstrap.contextDestroyed(this.servletContext); } publicbooleanisRunning() { returnthis.running; } publicintgetPhase() { return0; } publicbooleanisAutoStartup() { returntrue; } publicvoidstop(Runnablecallback) { callback.run(); } publicintgetOrder() { returnthis.order; } }
EurekaServerInitializerConfiguration
通过starter初始化和启动eureka,并抛出两个事件:EurekaRegistryAvailableEvent
服务注册事件,EurekaServerStartedEvent
服务启动事件,EurekaServer初始化核心的代码在eurekaServerBootstrap.contextInitialized
中
6.EurekaServerBootstrap
/*** @author Spencer Gibb*/publicclassEurekaServerBootstrap { privatestaticfinalLoglog=LogFactory.getLog(EurekaServerBootstrap.class); privatestaticfinalStringTEST="test"; privatestaticfinalStringARCHAIUS_DEPLOYMENT_ENVIRONMENT="archaius.deployment.environment"; privatestaticfinalStringEUREKA_ENVIRONMENT="eureka.environment"; privatestaticfinalStringDEFAULT="default"; privatestaticfinalStringARCHAIUS_DEPLOYMENT_DATACENTER="archaius.deployment.datacenter"; privatestaticfinalStringEUREKA_DATACENTER="eureka.datacenter"; protectedEurekaServerConfigeurekaServerConfig; protectedApplicationInfoManagerapplicationInfoManager; protectedEurekaClientConfigeurekaClientConfig; protectedPeerAwareInstanceRegistryregistry; protectedvolatileEurekaServerContextserverContext; protectedvolatileAwsBinderawsBinder; publicEurekaServerBootstrap(ApplicationInfoManagerapplicationInfoManager, EurekaClientConfigeurekaClientConfig, EurekaServerConfigeurekaServerConfig, PeerAwareInstanceRegistryregistry, EurekaServerContextserverContext) { this.applicationInfoManager=applicationInfoManager; this.eurekaClientConfig=eurekaClientConfig; this.eurekaServerConfig=eurekaServerConfig; this.registry=registry; this.serverContext=serverContext; } //Eureka初始化publicvoidcontextInitialized(ServletContextcontext) { try { //初始化环境initEurekaEnvironment(); //初始化上下文initEurekaServerContext(); //设置上下文属性context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwablee) { log.error("Cannot bootstrap eureka server :", e); thrownewRuntimeException("Cannot bootstrap eureka server :", e); } } //eureka上下文销毁publicvoidcontextDestroyed(ServletContextcontext) { try { log.info("Shutting down Eureka Server.."); context.removeAttribute(EurekaServerContext.class.getName()); destroyEurekaServerContext(); destroyEurekaEnvironment(); } catch (Throwablee) { log.error("Error shutting down eureka", e); } log.info("Eureka Service is now shutdown..."); } //初始化环境,设置一些环境参数protectedvoidinitEurekaEnvironment() throwsException { log.info("Setting the eureka configuration.."); //设置数据中心StringdataCenter=ConfigurationManager.getConfigInstance() .getString(EUREKA_DATACENTER); if (dataCenter==null) { log.info( "Eureka data center value eureka.datacenter is not set, defaulting to default"); ConfigurationManager.getConfigInstance() .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT); } else { ConfigurationManager.getConfigInstance() .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter); } //设置Eureka环境Stringenvironment=ConfigurationManager.getConfigInstance() .getString(EUREKA_ENVIRONMENT); if (environment==null) { ConfigurationManager.getConfigInstance() .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST); log.info( "Eureka environment value eureka.environment is not set, defaulting to test"); } else { ConfigurationManager.getConfigInstance() .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment); } } //初始化eurekaServer上下文protectedvoidinitEurekaServerContext() throwsException { // For backward compatibilityJsonXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder=newAwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } //把EurekaServerContext设置到EurekaServerContextHolder中EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现intregistryCount=this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics.//注册所有监视统计信息。EurekaMonitors.registerAllStats(); } /*** Server context shutdown hook. Override for custom logic*/protectedvoiddestroyEurekaServerContext() throwsException { EurekaMonitors.shutdown(); if (this.awsBinder!=null) { this.awsBinder.shutdown(); } if (this.serverContext!=null) { this.serverContext.shutdown(); } } /*** Users can override to clean up the environment themselves.*/protectedvoiddestroyEurekaEnvironment() throwsException { } protectedbooleanisAws(InstanceInfoselfInstanceInfo) { booleanresult=DataCenterInfo.Name.Amazon==selfInstanceInfo .getDataCenterInfo().getName(); log.info("isAws returned "+result); returnresult; } }
EurekaServerBootstrap 的contextInitialized方法中做了两个事情
- 通过
initEurekaEnvironment();
方法初始化环境,通过ConfigurationManager
设置环境相关的参数 - 通过
initEurekaServerContext();
初始化上下文,使用PeerAwareInstanceRegistryImpl.syncUp
从相邻的eureka节点复制注册表
7.JerseyFilter
在EurekaServerAutoConfiguration中注册了JerseyFilter用来处理所有的/eureka开头的请求
/*** Register the Jersey filter*/publicFilterRegistrationBeanjerseyFilterRegistration( javax.ws.rs.core.ApplicationeurekaJerseyApp) { FilterRegistrationBeanbean=newFilterRegistrationBean(); bean.setFilter(newServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX+"/*")); returnbean; }
通过FilterRegistrationBean来注册filter,其核心逻辑是交给 ServletContainer 来完成的
publicclassServletContainerextendsHttpServletimplementsFilter { ...省略... }
二.EurekaServer初始化流程
这里我们整理一下EurekaServer启动时是以什么样的流程进行初始化的,下面是根据Eureka Server启动断点跟踪出来的流程
1.ServletContainer 初始化
首先ServletContainer 会被创建并进行初始化,调用configure方法进行配置,至于 doFilter方法会在接受到请求时被执行
publicclassServletContainerextendsHttpServletimplementsFilter { ...省略... publicvoidinit(FilterConfigfilterConfig) throwsServletException { this.filterConfig=filterConfig; this.init((WebConfig)(newWebFilterConfig(filterConfig))); } protectedvoidconfigure(WebConfigwc, ResourceConfigrc, WebApplicationwa) { if (this.getServletConfig() !=null) { this.configure(this.getServletConfig(), rc, wa); } elseif (this.filterConfig!=null) { this.configure(this.filterConfig, rc, wa); } if (rcinstanceofReloadListener) { List<ContainerNotifier>notifiers=newArrayList(); Objecto=rc.getProperties().get("com.sun.jersey.spi.container.ContainerNotifier"); Iteratori$; if (oinstanceofContainerNotifier) { notifiers.add((ContainerNotifier)o); } elseif (oinstanceofList) { i$= ((List)o).iterator(); while(i$.hasNext()) { Objectelem=i$.next(); if (eleminstanceofContainerNotifier) { notifiers.add((ContainerNotifier)elem); } } } i$=ServiceFinder.find(ContainerNotifier.class).iterator(); while(i$.hasNext()) { ContainerNotifiercn= (ContainerNotifier)i$.next(); notifiers.add(cn); } rc.getProperties().put("com.sun.jersey.spi.container.ContainerNotifier", notifiers); } } protectedvoidconfigure(FilterConfigfc, ResourceConfigrc, WebApplicationwa) { rc.getSingletons().add(newServletContainer.ContextInjectableProvider(FilterConfig.class, fc)); Stringregex= (String)rc.getProperty("com.sun.jersey.config.property.WebPageContentRegex"); if (regex!=null&®ex.length() >0) { try { this.staticContentPattern=Pattern.compile(regex); } catch (PatternSyntaxExceptionvar6) { thrownewContainerException("The syntax is invalid for the regular expression, "+regex+", associated with the initialization parameter "+"com.sun.jersey.config.property.WebPageContentRegex", var6); } } this.forwardOn404=rc.getFeature("com.sun.jersey.config.feature.FilterForwardOn404"); this.filterContextPath=this.filterConfig.getInitParameter("com.sun.jersey.config.feature.FilterContextPath"); if (this.filterContextPath!=null) { if (this.filterContextPath.isEmpty()) { this.filterContextPath=null; } else { if (!this.filterContextPath.startsWith("/")) { this.filterContextPath='/'+this.filterContextPath; } if (this.filterContextPath.endsWith("/")) { this.filterContextPath=this.filterContextPath.substring(0, this.filterContextPath.length() -1); } } } } ...省略... }
2.Eureka上下文初始化
紧接着EureakServerContext的initialize方法被调用,该方法有 @PostConstruct注解决定了它是初始化方法
publicclassDefaultEurekaServerContextimplementsEurekaServerContext { publicvoidinitialize() { logger.info("Initializing ..."); //开始集群节点更新peerEurekaNodes.start(); try { //服务注册器初始化registry.init(peerEurekaNodes); } catch (Exceptione) { thrownewRuntimeException(e); } logger.info("Initialized"); }
这里做了两个事情
- 调用 peerEurekaNodes.start(); 定时更新Eureka集群中的节点
- 调用服务注册器PeerAwareInstanceRegistryImpl的初始化init
3.启动PeerEurekaNodes集群节点更新
PeerEurekaNodes.start被调用,这里通过定时器定时更新Eureka集群节点,默认10m/次
publicclassPeerEurekaNodes { //开始publicvoidstart() { //创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器taskExecutor=Executors.newSingleThreadScheduledExecutor( newThreadFactory() { publicThreadnewThread(Runnabler) { Threadthread=newThread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); returnthread; } } ); try { //更新集群中的节点中的注册信息updatePeerEurekaNodes(resolvePeerUrls()); //创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());RunnablepeersUpdateTask=newRunnable() { publicvoidrun() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwablee) { logger.error("Cannot update the replica Nodes", e); } } }; //定时任务taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), //定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTESserverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exceptione) { thrownewIllegalStateException(e); } for (PeerEurekaNodenode : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } } }
定时调用updatePeerEurekaNodes
方法更新集群,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown做关闭操作,新的节点调用createPeerEurekaNode进行创建,集群节点最终存储在List结构中
4.服务注册器初始化
在DefaultEurekaServerContext
中调用完peerEurekaNodes.start();
方法后调用PeerAwareInstanceRegistryImpl
.init方法进行注册器的初始化
//初始化方法publicvoidinit(PeerEurekaNodespeerEurekaNodes) throwsException { //最后一分钟的复制次数定时器Timer开始this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes=peerEurekaNodes; //初始化 ResponseCache(ResponseCacheImpl) ,负责缓存客户端查询的注册表信息initializedResponseCache(); //续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新scheduleRenewalThresholdUpdateTask(); //初始化远程注册表,默认么有远程RegioninitRemoteRegionRegistry(); try { //注册到对象监视器Monitors.registerObject(this); } catch (Throwablee) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
这里我们主要分析两个东西
- initializedResponseCache 初始化注册表响应缓存
- scheduleRenewalThresholdUpdateTask 定时更新续约阈值
initializedResponseCache初始化响应缓存
注意:这里有这么一句代码initializedResponseCache,它初始化了一个ResponseCache 响应缓存,ResponseCacheImpl是具体实现,该类中构造了一个readWriteCacheMap读写缓存的Map,和一个只读缓存readOnlyCacheMap的Map。为什么是响应缓存,以为客户端在获取服务注册表的时候就会从readOnlyCacheMap缓存中去获取
publicclassResponseCacheImplimplementsResponseCache { ...省略... //只读缓存privatefinalConcurrentMap<Key, Value>readOnlyCacheMap=newConcurrentHashMap<Key, Value>(); //读写缓存privatefinalLoadingCache<Key, Value>readWriteCacheMap; ResponseCacheImpl(EurekaServerConfigserverConfig, ServerCodecsserverCodecs, AbstractInstanceRegistryregistry) { this.serverConfig=serverConfig; this.serverCodecs=serverCodecs; //获取配置,是否是只读缓存,默认true,拉取注册表的时候还会从只读缓存拉取this.shouldUseReadOnlyResponseCache=serverConfig.shouldUseReadOnlyResponseCache(); this.registry=registry; //获取响应缓存更新时间间隔 30slongresponseCacheUpdateIntervalMs=serverConfig.getResponseCacheUpdateIntervalMs(); //构建一个 readWriteCacheMap this.readWriteCacheMap=CacheBuilder.newBuilder().initialCapacity(1000) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(newRemovalListener<Key, Value>() { publicvoidonRemoval(RemovalNotification<Key, Value>notification) { KeyremovedKey=notification.getKey(); if (removedKey.hasRegions()) { KeycloneWithNoRegions=removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(newCacheLoader<Key, Value>() { publicValueload(Keykey) throwsException { if (key.hasRegions()) { KeycloneWithNoRegions=key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Valuevalue=generatePayload(key); returnvalue; } }); //如果使用只读响应缓存,if (shouldUseReadOnlyResponseCache) { //每隔responseCacheUpdateIntervalMs=30s执行getCacheUpdateTasktimer.schedule(getCacheUpdateTask(), newDate(((System.currentTimeMillis() /responseCacheUpdateIntervalMs) *responseCacheUpdateIntervalMs) +responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwablee) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } } privateTimerTaskgetCacheUpdateTask() { returnnewTimerTask() { publicvoidrun() { //如果数据不一致,从readWriteCacheMap缓存更新readOnlyCacheMap缓存logger.debug("Updating the client cache from response cache"); for (Keykey : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); ValuecacheValue=readWriteCacheMap.get(key); ValuecurrentCacheValue=readOnlyCacheMap.get(key); if (cacheValue!=currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwableth) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } } } }; } }
scheduleRenewalThresholdUpdateTask 定时更新续约阈值
定时任务每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值
/**每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值* Schedule the task that updates <em>renewal threshold</em> periodically.* The renewal threshold would be used to determine if the renewals drop* dramatically because of network partition and to protect expiring too* many instances at a time.* */privatevoidscheduleRenewalThresholdUpdateTask() { //定时任务timer.schedule(newTimerTask() { publicvoidrun() { //更新续约阈值updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); //900s}
updateRenewalThreshold是具体的更新逻辑
// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()/*** Updates the <em>renewal threshold</em> based on the current number of* renewals. The threshold is a percentage as specified in* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals* received per minute {@link #getNumOfRenewsInLastMin()}.*/privatevoidupdateRenewalThreshold() { try { //获取到注册表Applicationsapps=eurekaClient.getApplications(); intcount=0; // 计算有多少个注册的服务实例for (Applicationapp : apps.getRegisteredApplications()) { for (InstanceInfoinstance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } //枷锁synchronized (lock) { // Update threshold only if the threshold is greater than the// current expected threshold of if the self preservation is disabled.// 只有当阀值大于当前预期值时或者关闭了自我保护模式才更新 if ((count*2) > (serverConfig.getRenewalPercentThreshold() *numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) { //判断如果阈值时候大于预期的阈值 或者 关闭了我保护//更新每分钟的预期续订次数:服务数 * 2 ,每个客户端30s/次,1分钟2次this.expectedNumberOfRenewsPerMin=count*2; //更新每分钟阈值的续订次数 :服务数 * 2 * 0.85 (百分比阈值) this.numberOfRenewsPerMinThreshold= (int) ((count*2) *serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwablee) { logger.error("Cannot update renewal threshold", e); } }
当关闭自我保护,或者当前阈值大于预期阈值,就会更新续约的阈值,那么这是怎么样的一个更新算法呢?
- 每分钟的预期续订次数 = 服务数 * 2 ,因为: 一个服务30s/一次续约
- 每分钟阈值 = 服务数 * 2 * 0.85
5.EurekaServer初始化配置
EurekaServerInitializerConfiguration 的start方法会在Spring容器刷新的时候调用,因为它实现了SmartLifecycle接口 , start方法中新开线程调用eurekaServerBootstrap.contextInitialized进行初始化
publicvoidstart() { newThread(newRunnable() { publicvoidrun() { try { //TODO: is this class even needed now?eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(newEurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running=true; publish(newEurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exceptionex) { // Help!log.error("Could not initialize Eureka servlet context", ex); } } }).start(); }
6.Eureka启动引导
EurekaServerBootstrap .contextInitialized 负责初始化Eureak环境和初始化上下文
//Eureka初始化publicvoidcontextInitialized(ServletContextcontext) { try { //初始化环境initEurekaEnvironment(); //初始化上下文initEurekaServerContext(); //设置上下文属性context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwablee) { log.error("Cannot bootstrap eureka server :", e); thrownewRuntimeException("Cannot bootstrap eureka server :", e); } }
在初始化上下文的时候会调用 PeerAwareInstanceRegistryImpl.syncUp(); 从相邻的集群节点同步注册表,通过PeerAwareInstanceRegistryImpl.register注册到当前Eureka节点
//初始化eurekaServer上下文protectedvoidinitEurekaServerContext() throwsException { ...省略... //把EurekaServerContext设置到EurekaServerContextHolder中EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现intregistryCount=this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics.//注册所有监视统计信息。EurekaMonitors.registerAllStats(); }
同步相邻节点的注册表PeerAwareInstanceRegistryImpl.syncUp()
publicintsyncUp() { // Copy entire entry from neighboring DS nodeintcount=0; //getRegistrySyncRetries重试次数默认5次for (inti=0; ((i<serverConfig.getRegistrySyncRetries()) && (count==0)); i++) { if (i>0) { try { //通信中断,等待下一次切换实例Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedExceptione) { logger.warn("Interrupted during registry transfer.."); break; } } //获取注册表Applicationsapps=eurekaClient.getApplications(); //循环服务列表,依次注册for (Applicationapp : apps.getRegisteredApplications()) { for (InstanceInfoinstance : app.getInstances()) { try { if (isRegisterable(instance)) { //获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry 中缓存起来register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwablet) { logger.error("During DS init copy", t); } } } } returncount; }
到这里EurekaServer就算初始化完成了
总结