题外话:
Spring Cloud Netflix 作为springcloud 我们常用的一个项目,其子项目Eureka,zuul,Rebbion是我熟悉的。但是Spring Cloud Netflix 被宣布进入了维护模式, 意思不再添加新特性了,这对于我们来说很不友好了。 大家纷纷寻找相应的替代工具。(具体可以网上搜索)
但这不影响我们学习一些组件的框架思想。我对注册发现,负载均衡这块比较感兴趣。所以在此记录下自己的阅读心得。
版本说明:Finchley.SR1
1.组件的配置:
1.1 启用Eureka注册中心
当我们在springboot的启动类上加上@EnableEurekaServer
,一个基本的注册中心就可以生效了。
@SpringBootApplication @EnableEurekaServer public class EurekaServerApplication { public static void main(String[] args) { SpringApplication.run(EurekaServerApplication.class, args); } }
@EnableEurekaServer
仅仅是引入EurekaServerMarkerConfiguration
类。 Marker的英文意思是标记的意思,spring相关框架中有很多类似xxxMarkerxxx
这样的注解.其实他们的意思就是一个开关。会在其他地方进行开关的判断,有对应xxxMarkerxxx
类就表示打开,没有表示关闭。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
EurekaServerMarkerConfiguration
开关打开的是哪个类呢??
org.springframework.cloud.netflix.eureka.server
项目spring.factories资源文件中自动注入类EurekaServerAutoConfiguration
,此类在自动注入的过程中,会判断开关是否打开来决定是否自动注入相关类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
@Configuration @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { ..... }
由此看出EurekaServerMarkerConfiguration
开关打开的EurekaServerAutoConfiguration
。
1.2 组件的配置。
下面我们看看EurekaServerAutoConfiguration
配置了什么东西。 (1.先看注解上相关配置
@Configuration @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { ... }
- 引入
EurekaServerInitializerConfiguration
类,此类继承了SmartLifecycle
接口,所以会在spring启动完毕时回调此类的start()方法 - EurekaDashboardProperties 表示Euerka面板相关配置属性。例如:是否打开面板;面板的访问路径
- InstanceRegistryProperties 表示实例注册相关配置属性。例如:每分钟最大的续约数量,默认打开的通信数量 等
- 加载
/eureka/server.properties
的配置属性。
(2.再看类内部相关配置(代码比较长,这里只讲内容,建议打开源码看)寻找类中的Bean
- HasFeatures 注册HasFeatures表示Eureka特征,
- EurekaServerConfigBean配置类,表示EurekaServer的配置信息。通过
@ConfigurationProperties(“eureka.server”)
映射我们的配置文件中的eureka.server.xxxx
格式的配置信息(此类很重要啊,我们想修改EurekaServer的配置信息,可以配置eureka.server.xxxx
覆盖此类中的默认配置) - EurekaController: 面板的访问配置默认是“/”
- 注册编码器
(ServerCodecs)CloudServerCodecs
- PeerAwareInstanceRegistry:对等节点同步器。 多个节点下复制相关。 与注册中心高可用有关的组件。此处注册的是
InstanceRegistry
(注意PeerAwareInstanceRegistry实现了AbstractInstanceRegistry,这里准确的说是 对等节点+当前节点同步器) - PeerEurekaNodes: Eureka-Server 集群节点的集合。存储了集群下各个节点信息。也是与高可用有关。
- EurekaServerContext : 上下文。默认注册的
DefaultEurekaServerContext
- EurekaServerBootstrap: EurekaServer启动器。EurekaServerBootstrap
- FilterRegistrationBean: 注册 Jersey filter过滤器。这里有必要讲一下。Eureka也是servlet应用。不过他是通过Jersey 框架来提供接口的。Jersey 框架是一个类Springmvc的web框架。我们项目中大多都是使用springmvc来处理。所以注册 Jersey filter过滤器,把
/eureka
开头的请求都交给Jersey 框架去解析。容器是com.sun.jersey.spi.container.servlet.ServletContainer
- ApplicationResource: 暴漏
com.netflix.discovery","com.netflix.eureka"
包路径下的接口。通常我们再springmvc中通过Controller概念来表示接口,Jersey框架下用ApplicationResource的概念来表示接口。暴露的接口其实就是eureka各个应用通信的接口。(下面再说这些接口)
EurekaServerAutoConfiguration
基本上就做了这些工作。我们来归类总结下
针对当前Eureka实例的相关组件:
- EurekaDashboardProperties:面板属性
- EurekaController: 面板的访问的处理器。
- InstanceRegistryProperties:实例注册相关属性
- (EurekaServerConfig)EurekaServerConfigBean:当前ErekekaServer相关配置
- EurekaServerContext : 当前Eureka 注册中心上下文
- 请求相关组件:注册
/eureka
路径的相关接口,注册拦截/eureka
的拦截器,注册com.sun.jersey.spi.container.servlet.ServletContainer
容器来处理对应的请求
两个针对集群下相关组件:
- PeerAwareInstanceRegistry:用于集群下的节点相关复制信息用
- PeerEurekaNodes:集群下的所有节点信息
两个针对启动相关类:
- EurekaServerInitializerConfiguration: 对接spring,再spring启动完成后,调用
- EurekaServerBootstrap:启动器,用于启动当前Eureak实例的上下文
至此:我们也可以大致了解了一个EurekaServer大致长什么样子了。
2.EurekaServerContext初始化:
EurekaServerContext作为上下文,应该是核心所在。上文讲过注册DefaultEurekaServerContext
。此类中有@Inject,@PostConstruct, @PreDestroy
注解的方法,重点来看看。
@Inject public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.registry = registry; this.peerEurekaNodes = peerEurekaNodes; this.applicationInfoManager = applicationInfoManager; }
2.1 @Inject注解的构造方法
@Inject
注解的方法,参数由IOC容器注入。serverConfig ,serverCodecs ,registry ,peerEurekaNodes
我们已经认识了。ApplicationInfoManager 是用来管理应用信息的,也就是实例注册信息,由ApplicationInfoManager统一管理。
2.2 @PostConstruct注解的initialize()方法
@PostConstruct
修饰的方法会在服务器加载Servle的时候运行,并且只会被服务器执行一次,被@PostConstruct
修饰的方法会在构造函数之后,init()方法之前运行.
@PostConstruct @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); }
这个方法很简明,主要有两个重要的的点:
- peerEurekaNodes.start();
- registry.init(peerEurekaNodes);
2.2.1 peerEurekaNodes.start()
PeerEurekaNodes: 用于管理PeerEurekaNode节点集合。 peerEurekaNodes.start();
public void start() { //创建一个单线程定时任务线程池:线程的名称叫做Eureka-PeerNodesUpdater taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 解析Eureka Server URL,并更新PeerEurekaNodes列表 updatePeerEurekaNodes(resolvePeerUrls()); //创建任务 //任务内容为:解析Eureka Server URL,并更新PeerEurekaNodes列表 Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; //交给线程池执行,执行间隔10min taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } }
resolvePeerUrls():
解析配置的对等体URL。就是在配置文件中配置的多个Eureka注册中心的URL.
updatePeerEurekaNodes:
protected void updatePeerEurekaNodes(List<String> newPeerUrls) { //计算需要移除的url= 原来-新配置。 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); //计算需要增加的url= 新配置-原来的。 Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); //没有变化就不更新 if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); // 删除需要移除url对应的节点。 if (!toShutdown.isEmpty()) { int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // 添加需要增加的url对应的节点 if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } //更新节点列表 this.peerEurekaNodes = newNodeList; //更新节点url列表 this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
总结:start()方法,其实就是完成新配置的eureka集群信息的初始化更新工作。
2.2.2 registry.init(peerEurekaNodes)
对等节点同步器的初始化。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { //统计最近X秒内的来自对等节点复制的续约数量(默认1秒) this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; //初始化返回结果缓存 initializedResponseCache(); //更新续约阀值 scheduleRenewalThresholdUpdateTask(); //初始化远程区域注册 相关信息 initRemoteRegionRegistry(); ... }
numberOfReplicationsLastMin.start():
启动一个定时任务,任务名称为Eureka-MeasureRateTimer
,每1秒统计从对等节点复制的续约数,将当前的桶的统计数据放到lastBucket,当前桶置为0
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1); -- this.timer = new Timer("Eureka-MeasureRateTimer", true); --- timer.schedule(new TimerTask() { @Override public void run() { try { // Zero out the current bucket. lastBucket.set(currentBucket.getAndSet(0)); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate", e); } } }, sampleInterval, sampleInterval);
注意:此统计器用于节点之间复制的统计。