前言
SpringCloud已经成为落地Java微服务架构最流行的框架之一,对于一个优秀的程序员而言不仅需要灵活的使用SpringCloud,还要知道他的实现原理和思想,知其然知其所以然,只有这样我们写出来的代码才有灵魂,熟读源码无论对解决开发中的BUG,还是应对面试对我们都有很大的帮助,本章将带领大家一步一步分析SpringCloud的源码。
一.Eureak核心功能
我们先来回顾一下Eureka的几个核心功能
1.1.服务注册
Eureka是一个服务注册与发现
组件,它包含了EurekaServer 服务端(也叫注册中心)和 EurekaClient客户端两部分组成,EureakServer
是独立的服务,而EurekaClient
需要集成到每个微服务中。
微服务(EurekaClient,通常是提供者服务)在启动的时候会主动向EureakServer注册自己(如提交自己的服务名,ip,端口等),在 EurekaServer会形成一个服务注册列表。
1.2.服务发现
微服务(EurekaClient,通常是消费者服务)会定期(默认30s/次)从EureakServer拉取服务注册列表
缓存到本地,当提供者服务向消费者微服务发起调用的时候,会根据目标服务的服务名从服务注册列表
找到一个或者多个服务实例(目标服务可能做了集群),然后使用负载均衡算法
(Ribbon)选择其中一个服务实例,最后向该服务实例发起远程调用。
1.3.服务续约
另外,微服务(EurekaClient)采用定时(LeaseRenewalIntervalInSeconds:默认30s)发送“心跳
”请求向EureakServer发请求进行服务续约
,其实就是定时向 EureakServer发请求报告自己的健康状况,目的是告诉EureakServer自己还活着,不要把自己从服务注册列表
中剔除掉,那么当微服务(EurekaClient)宕机或因为网络波动未向EureakServer续约(默认3次续约失败,即达到90s未续约),注册中心机会从服务地址清单中剔除该续约失败的服务
。
1.4.服务下线
微服务(EurekaClient)关闭服务前向注册中心发送下线请求
,注册中心(EurekaServer)接受到下线请求负责将该服务实例从注册列表剔除
下面我们用一张图来介绍Eureka的工作流程
二.Eureka核心概念
要跟踪Eurkea的源码,我们必须对一些核心概念以及一些核心功能类做 一些了解,待会儿看源码的时候才不会那么懵逼
2.1.com.netflix.appinfo.InstanceInfo
该类是Eureka用来封装服务实例信息的对象,用来代表服务注册的实例,通过它实现服务注册,其中包括:服务实例的ID,服务的名字,服务的IP地址,服务端口,主机
等等参数,该对象贯穿了整个Eureak客户端和Eureka服务端 , InstanceInfo 源码如下
//封装了服务注册所需要的注册信息EurekaConfigBasedInstanceInfoProvider.class) ("com.netflix.discovery.converters.EntityBodyConverter") ("instance") ("instance") (publicclassInstanceInfo { ...省略... //实例IDprivatevolatileStringinstanceId; //服务名privatevolatileStringappName; privatevolatileStringappGroupName; //服务地址privatevolatileStringipAddr; //主机privatevolatileStringhostName; //端口privatevolatileintport=DEFAULT_PORT; ...省略...
该类是如何被创建的呢?在Eureak环境中通过 EurekaInstanceConfigBean
读取yml中的Eureka 实例配置,而InstanceInfo
通过 InstanceInfoFactory.create
方法来构建然后把EurekaInstanceConfigBean
获取实例配置信息绑定到InstanceInfo
对象中 ,下面是InstanceInfoFactory.create
的部分原码
publicclassInstanceInfoFactory { //创建服务注册实例对象,EurekaInstanceConfig 是用来加载配置文件中的服务配置信息publicInstanceInfocreate(EurekaInstanceConfigconfig) { //服务续约Builder设置续约心跳间隔时间和续约超时时间LeaseInfo.BuilderleaseInfoBuilder=LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); //翻译:构建要注册到Eureka服务端的服务实例,绑定相关的配置项目,这里的config就是// Builder the instance information to be registered with eureka// serverInstanceInfo.Builderbuilder=InstanceInfo.Builder.newBuilder(); builder.setNamespace(namespace).setAppName(config.getAppname()) .setInstanceId(config.getInstanceId()) .setAppGroupName(config.getAppGroupName()) .setDataCenterInfo(config.getDataCenterInfo()) .setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false)) .setPort(config.getNonSecurePort()) .enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled()) .setSecurePort(config.getSecurePort()) .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled()) .setVIPAddress(config.getVirtualHostName()) .setSecureVIPAddress(config.getSecureVirtualHostName()) .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl()) .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl()) .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(), config.getSecureHealthCheckUrl()) .setASGName(config.getASGName()); .....省略...... //添加元数据信息// Add any user-specific metadata informationfor (Map.Entry<String, String>mapEntry : config.getMetadataMap().entrySet()) { Stringkey=mapEntry.getKey(); Stringvalue=mapEntry.getValue(); // only add the metadata if the value is presentif (value!=null&&!value.isEmpty()) { builder.add(key, value); } } //构建服务注册实例对象InstanceInfoinstanceInfo=builder.build(); //设置服务租约对象instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); returninstanceInfo; }
这里创建了2个对象
- 通过InstanceInfo.Builder 构建 InstanceInfo
- 通过LeaseInfo.Builder 构建LeaseInfo
2.2.com.netflix.appinfo.LeaseInfo
该对象用来描述服务的续约信息
,比如约定的心跳周期,租约有效期,最近一次续约时间等,也是通过InstanceInfoFactory.create
来创建(见上面)源码如下
"leaseInfo") (publicclassLeaseInfo { publicstaticfinalintDEFAULT_LEASE_RENEWAL_INTERVAL=30; publicstaticfinalintDEFAULT_LEASE_DURATION=90; // Client settings//客户端:续约间隔周期时间 30/s 心跳机制privateintrenewalIntervalInSecs=DEFAULT_LEASE_RENEWAL_INTERVAL; //客户端:续约有效时长超过 90s续约失败(服务端会剔除该实例)privateintdurationInSecs=DEFAULT_LEASE_DURATION; // Server populated//服务端: 服务端设置该租约第一次续约时间privatelongregistrationTimestamp; //服务端: 服务端设置该租约最后一次续约时间privatelonglastRenewalTimestamp; //服务端: 服务端设置该租约被剔除时间privatelongevictionTimestamp; //服务端: 服务端设置该服务上线 up 时间privatelongserviceUpTimestamp; }
这些参数用来维持续约心跳,心跳周期,续约有效期,最近一次续约时间,最后一次续约时间
等
2.3.org.springframework.cloud.client.ServiceInstance
服务发现的抽象接口,约定了服务发现的实例应用有哪些通用的信息,是spring cloud对service discovery的实例信息的抽象接口,该接口可以适配多种注册中心如:Eureka,Zookepper,Consul ,源码如下
/*** Represents an instance of a Service in a Discovery System* @author Spencer Gibb*/publicinterfaceServiceInstance { /**服务ID* @return the service id as registered.*/StringgetServiceId(); /**服务主机* @return the hostname of the registered ServiceInstance*/StringgetHost(); /**服务端口* @return the port of the registered ServiceInstance*/intgetPort(); /**是否开启https* @return if the port of the registered ServiceInstance is https or not*/booleanisSecure(); /**服务的URI地址* @return the service uri address*/URIgetUri(); /**实例的元数据信息* @return the key value pair metadata associated with the service instance*/Map<String, String>getMetadata(); /*** @return the scheme of the instance*/defaultStringgetScheme() { returnnull; } }
2.4.com.netflix.appinfo.InstanceInfo.InstanceStatus
用来表示服务实例的状态,状态有:UP上线,DOWN下线,STARTING运行中,OUT_OF_SERVICE下线,UNKNOWN未知
EurekaConfigBasedInstanceInfoProvider.class) ("com.netflix.discovery.converters.EntityBodyConverter") ("instance") ("instance") (publicclassInstanceInfo { ...省略... publicenumInstanceStatus { UP, // Ready to receive trafficDOWN, // Do not send traffic- healthcheck callback failedSTARTING, // Just about starting- initializations to be done - do not// send trafficOUT_OF_SERVICE, // Intentionally shutdown for trafficUNKNOWN; publicstaticInstanceStatustoEnum(Strings) { if (s!=null) { try { returnInstanceStatus.valueOf(s.toUpperCase()); } catch (IllegalArgumentExceptione) { // ignore and fall through to unknownlogger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN); } } returnUNKNOWN; } } }
2.5.com.netflix.discovery.shared.Application
Application : 一个Application代表一个应用,里面包含了应用实例列表,即:包含了多个InstanceInfo
实例,源码如下
/**包含了应用实例列表* The application class holds the list of instances for a particular* application.** @author Karthik Ranganathan**/"com.netflix.discovery.converters.EntityBodyConverter") ("application") ("application") (publicclassApplication { privatestaticRandomshuffleRandom=newRandom(); privateStringname; privatevolatilebooleanisDirty=false; privatefinalSet<InstanceInfo>instances; //无序状态实例列表privatefinalAtomicReference<List<InstanceInfo>>shuffledInstances; //map缓存服务ID 对应 实例关系privatefinalMap<String, InstanceInfo>instancesMap; ...省略... /**翻译:添加实例信息* Add the given instance info the list.** @param i* the instance info object to be added.*/publicvoidaddInstance(InstanceInfoi) { instancesMap.put(i.getId(), i); synchronized (instances) { instances.remove(i); instances.add(i); isDirty=true; } } /**翻译:移出实例信息* Remove the given instance info the list.** @param i* the instance info object to be removed.*/publicvoidremoveInstance(InstanceInfoi) { removeInstance(i, true); } /**翻译:获取实例信息列表* Gets the list of instances associated with this particular application.* <p>* Note that the instances are always returned with random order after* shuffling to avoid traffic to the same instances during startup. The* shuffling always happens once after every fetch cycle as specified in* {@link EurekaClientConfig#getRegistryFetchIntervalSeconds}.* </p>** @return the list of shuffled instances associated with this application.*/"instance") (publicList<InstanceInfo>getInstances() { returnOptional.ofNullable(shuffledInstances.get()).orElseGet(this::getInstancesAsIsFromEureka); } /**通过id获取实例信息* Get the instance info that matches the given id.** @param id* the id for which the instance info needs to be returned.* @return the instance info object.*/publicInstanceInfogetByInstanceId(Stringid) { returninstancesMap.get(id); } ...省略... }
该类中还提供了一些列方法:
void addInstance(InstanceInfo i
添加实例,void removeInstance(InstanceInfo i)
移出实例,List<InstanceInfo> getInstances()
获取实例列表,InstanceInfo getByInstanceId(String id)
通过id获取实例
2.6.com.netflix.discovery.shared.Applications
这个是服务注册列表对象,该类包装了由eureka服务器返回的所有注册表信息,源码如下:
/**翻译:该类包装了由eureka服务器返回的所有注册表信息------------------------------------------------------------------------------* The class that wraps all the registry information returned by eureka server.翻译: EurekaClientConfig#getRegistryFetchIntervalSeconds() 方法从 Eureak Server获取服务注册列表,然后对服务进行过滤,按照(EurekaClientConfig#shouldFilterOnlyUpInstances())的规则过滤上线的服务------------------------------------------------------------------------------* <p>* Note that the registry information is fetched from eureka server as specified* in {@link EurekaClientConfig#getRegistryFetchIntervalSeconds()}. Once the* information is fetched it is shuffled and also filtered for instances with* {@link InstanceStatus#UP} status as specified by the configuration* {@link EurekaClientConfig#shouldFilterOnlyUpInstances()}.* </p>** @author Karthik Ranganathan**/"com.netflix.discovery.converters.EntityBodyConverter") ("applications") ("applications") (publicclassApplications { privatestaticclassVipIndexSupport { finalAbstractQueue<InstanceInfo>instances=newConcurrentLinkedQueue<>(); finalAtomicLongroundRobinIndex=newAtomicLong(0); finalAtomicReference<List<InstanceInfo>>vipList=newAtomicReference<List<InstanceInfo>>(Collections.emptyList()); publicAtomicLonggetRoundRobinIndex() { returnroundRobinIndex; } publicAtomicReference<List<InstanceInfo>>getVipList() { returnvipList; } } privatestaticfinalStringSTATUS_DELIMITER="_"; privateStringappsHashCode; privateLongversionDelta; //注册成功的服务的集合privatefinalAbstractQueue<Application>applications; //注册成功的服务的集合,名字和服务对应关系privatefinalMap<String, Application>appNameApplicationMap; privatefinalMap<String, VipIndexSupport>virtualHostNameAppMap; privatefinalMap<String, VipIndexSupport>secureVirtualHostNameAppMap; /**创建一个新的空的Eureka应用程序列表。* Create a new, empty Eureka application list.*/publicApplications() { this(null, -1L, Collections.emptyList()); } /**这里是把注册成功的服务实例保存起来 registeredApplications是注册成功的实例* Note that appsHashCode and versionDelta key names are formatted in a* custom/configurable way.*/publicApplications( ("appsHashCode") StringappsHashCode, "versionDelta") LongversionDelta, ("application") List<Application>registeredApplications) { (this.applications=newConcurrentLinkedQueue<Application>(); this.appNameApplicationMap=newConcurrentHashMap<String, Application>(); this.virtualHostNameAppMap=newConcurrentHashMap<String, VipIndexSupport>(); this.secureVirtualHostNameAppMap=newConcurrentHashMap<String, VipIndexSupport>(); this.appsHashCode=appsHashCode; this.versionDelta=versionDelta; //添加到 队列中 applications 和 appNameApplicationMap map中for (Applicationapp : registeredApplications) { this.addApplication(app); } } ...省略... /**添加应用* Add the <em>application</em> to the list.** @param app* the <em>application</em> to be added.*/publicvoidaddApplication(Applicationapp) { appNameApplicationMap.put(app.getName().toUpperCase(Locale.ROOT), app); addInstancesToVIPMaps(app, this.virtualHostNameAppMap, this.secureVirtualHostNameAppMap); applications.add(app); } /**获取所有注册成功的应用* Gets the list of all registered <em>applications</em> from eureka.** @return list containing all applications registered with eureka.*/"application") (publicList<Application>getRegisteredApplications() { returnnewArrayList<Application>(this.applications); } /**根据名字获取注册成功的应用* Gets the list of all registered <em>applications</em> for the given* application name.** @param appName* the application name for which the result need to be fetched.* @return the list of registered applications for the given application* name.*/publicApplicationgetRegisteredApplications(StringappName) { returnappNameApplicationMap.get(appName.toUpperCase(Locale.ROOT)); } //总共有多个个实例publicintsize() { returnapplications.stream().mapToInt(Application::size).sum(); } //把应用的顺序打乱publicvoidshuffleInstances(booleanfilterUpInstances) { shuffleInstances(filterUpInstances, false, null, null, null); } }
该类中制定了一些应用操作方法,主要如下
public void addApplication(Application app):
添加一个应用public List<Application> getRegisteredApplications():
获取所有注册的应用public Application getRegisteredApplications(String appName):
根据名字获取注册的应用public int size()
:实例总和
2.6.ApplicationInfoManager
实例管理器,在EurekaClientAutoConiguration中根据EurekaInstanceConfig和InstanceInfo创建得到ApplicationInfoManager,它的作用主要是来操作 InstanceInfo
, 主要提供了对服务实例状态的设置,eureka状态改变的监听,以及instanceinfo,LeaseInfo租约信息刷新
等等。
publicclassApplicationInfoManager { privatestaticfinalLoggerlogger=LoggerFactory.getLogger(ApplicationInfoManager.class); privatestaticfinalInstanceStatusMapperNO_OP_MAPPER=newInstanceStatusMapper() { publicInstanceStatusmap(InstanceStatusprev) { returnprev; } }; privatestaticApplicationInfoManagerinstance=newApplicationInfoManager(null, null, null); //实例状态改变监听器protectedfinalMap<String, StatusChangeListener>listeners; privatefinalInstanceStatusMapperinstanceStatusMapper; //实例信息对象privateInstanceInfoinstanceInfo; //实例配置对象privateEurekaInstanceConfigconfig; ...省略... //注册元数据publicvoidregisterAppMetadata(Map<String, String>appMetadata) { instanceInfo.registerRuntimeMetadata(appMetadata); } //设置euerka状态改变publicsynchronizedvoidsetInstanceStatus(InstanceStatusstatus) { InstanceStatusnext=instanceStatusMapper.map(status); if (next==null) { return; } //设置状态InstanceStatusprev=instanceInfo.setStatus(next); if (prev!=null) { for (StatusChangeListenerlistener : listeners.values()) { try { //发布状态改变时间,对应的监听器会监听到该状态的改变listener.notify(newStatusChangeEvent(prev, next)); } catch (Exceptione) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } } //注册状态改变的监听器publicvoidregisterStatusChangeListener(StatusChangeListenerlistener) { listeners.put(listener.getId(), listener); } //取消注册监听器publicvoidunregisterStatusChangeListener(StringlistenerId) { listeners.remove(listenerId); } /**如果EurekaInstanceConfig中配置数据改变,刷新数据中心的数据,做InstanceInfo数据刷新,发送给下一次心跳* Refetches the hostname to check if it has changed. If it has, the entire* <code>DataCenterInfo</code> is refetched and passed on to the eureka* server on next heartbeat.** see {@link InstanceInfo#getHostName()} for explanation on why the hostname is used as the default address*/publicvoidrefreshDataCenterInfoIfRequired() { StringexistingAddress=instanceInfo.getHostName(); StringnewAddress; if (configinstanceofRefreshableInstanceConfig) { // Refresh data center info, and return up to date addressnewAddress= ((RefreshableInstanceConfig) config).resolveDefaultAddress(true); } else { newAddress=config.getHostName(true); } StringnewIp=config.getIpAddress(); if (newAddress!=null&&!newAddress.equals(existingAddress)) { logger.warn("The address changed from : {} => {}", existingAddress, newAddress); // :( in the legacy code here the builder is acting as a mutator.// This is hard to fix as this same instanceInfo instance is referenced elsewhere.// We will most likely re-write the client at sometime so not fixing for now.InstanceInfo.Builderbuilder=newInstanceInfo.Builder(instanceInfo); builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo()); instanceInfo.setIsDirty(); } } //刷新租约信息,根据EurekaInstanceConfig重新设置LeaseInfopublicvoidrefreshLeaseInfoIfRequired() { LeaseInfoleaseInfo=instanceInfo.getLeaseInfo(); if (leaseInfo==null) { return; } intcurrentLeaseDuration=config.getLeaseExpirationDurationInSeconds(); intcurrentLeaseRenewal=config.getLeaseRenewalIntervalInSeconds(); if (leaseInfo.getDurationInSecs() !=currentLeaseDuration||leaseInfo.getRenewalIntervalInSecs() !=currentLeaseRenewal) { LeaseInfonewLeaseInfo=LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(currentLeaseRenewal) .setDurationInSecs(currentLeaseDuration) .build(); instanceInfo.setLeaseInfo(newLeaseInfo); instanceInfo.setIsDirty(); } } ...省略... }
三.服务注册与发现核心类
上面我们介绍了Eureak里面的几个核心概念,我们知道Eureak功能包括,服务注册发现,服务续约,服务剔除,服务下线等功能,围绕这些功能,Eurkea提供了一些核心类
下面这里类来源于netflix eureka的规范
3.1.com.netflix.eureka.lease.LeaseManager
负责处理服务续约,负责注册租约/续约/续约/取消租约/租约过期等
3.2. com.netflix.discovery.shared.LookupService
服务发现的接口,提供了查找应用列表(Applications) 和 应用实例信息的方法
3.3.com.netflix.eureka.registry.InstanceRegistry
应用实例注册表接口,继承LookupService 和LeaseManager 接口,提供应用实例的注册与发现服务功能,并且做了一些功能的扩展
3.4. com.netflix.eureka.registry.AbstractInstanceRegistry
应用实例注册表的抽象实现类,里面包括了服务注册,取消注册,服务续约,修改服务状态,获取服务注册表等等基本方法
3.5.com.netflix.eureka.registry.PeerAwareInstanceRegistry
应用对象注册表接口,提供了Eureka群内注册信息的同步功能。
3.6.com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
应用对象注册表接口实现,是PeerAwareInstanceRegistry的子类
SpringCloud 在Netflix的基础上延伸了下面这些类
3.7.org.springframework.cloud.netflix.eureka.server.InstanceRegistry
这是一个类,是com.netflix.eureka.registry.InstanceRegistry接口的后代子孙,它继承于PeerAwareInstanceRegistryImpl,复写了相关方法(注册,续约,下线等等),结合当前上下文环境做了一些功能扩展。
3.8.org.springframework.cloud.client.serviceregistry.ServiceRegistry
服务注册接口,提供了服务注册,取消注册,设置服务状态,获取服务状态方法。
3.9.org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry
服务注册器,是ServiceRegistry接口的具体实现
3.10.org.springframework.cloud.netflix.eureka.serviceregistry.EurekaRegistration
Eurak的注册信息,实现了ServiceInstance
3.11.org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
Eureka自动注册配置类,在SpringBoot启动时该类被注册
3.12.org.springframework.cloud.netflix.eureka.EurekaClientConfigBean
这个类实现了netflix 的 EurekaClientConfig客户端配置接口,是对Eureka客户端的配置对象,即我们在配置文件中配置的 eureka.client
节点下的配置都会被封装到该对象中,该对象中会创建一个 EurekaTransportConfig
对象,EurekaTransportConfig是EurekaClient和EurekaServer的通信配置。配置如:
- enabled 默认值:true ,是否开启Eureka client
registryFetchIntervalSeconds
:默认值: 30,client从server获取服务列表信息的间隔instanceInfoReplicationIntervalSeconds
: server复制实例更改的间隔 30/sinitialInstanceInfoReplicationIntervalSeconds
: server复制实例信息的间隔 40/seurekaServerReadTimeoutSeconds
: 从server读取所需的超时时间 8/seurekaServerConnectTimeoutSeconds
:连接server的超时时间 5 /s
…省略…
3.13.org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean
该类实现了netflix的EurekaInstanceConfig接口,是的服务实例信息配置,ApplicationManager通过该接口用来构建InstanceConfig,比如我们在配置文件中配置的eureka.instance
开头的配置就会配置到该对象中,配置如:
hostname
:主机名leaseRenewalIntervalInSeconds
:续约心跳间隔时间 默认 30/sleaseExpirationDurationInSeconds
: 租约到期时间instanceId
:实例的ID