一、Eureka架构
从下面的架构图中可以看出,不管是服务的调用者还是服务的提供者都可以认为是一个EurekaClient,在启动的过程中会将自身注册到EurekaServer中(也就是Eureka注册中心),并通过心跳保持服务的续约。
springcloud高可用架构图
CAP
一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)
Eureka保证AP(ZK保证CP)
注册过程
1、Register(注册): 服务提供者向server注册或者更新自己的信息
2、Replicate(同步): server集群之间同步注册信息
3、Get(获取):服务消费者从server获取其他服务注册信息,并缓存到本地
4、renew(续约):client向server发送心跳维持元数据有效性,一定时间未收到心跳则从自身注册表中删除
5、服务下线:client下线时向server发送信息,server会注销该服务元数据,并通知服务消费者。
二、Eureka的启动注册
注册是InstanceId的生成可以查看另一篇SpringCloud学习笔记(四)-InstanceId的生成
通常我们通过@EnableDiscoveryClient来声明一个Eureka客户端。源码注释这样描述EnableDiscoveryClient
Annotation to enable a DiscoveryClient implementation
即通过该注释会开启一个DiscoveryClient实例,DiscoveryClient类源码可以看到,DiscoveryClient的构造方法中有如下代码段。
// default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff
而通过DiscoveryClient的源码可以看到,在服务启动时,会启动两个线程池,一个用于心跳的保持,一个用于缓存的刷新。进一步跟踪源码,在initScheduledTasks中,开启心跳定时器,同时会启动一个线程InstanceInfoReplicator,这个线程会将本地实例信息更新到远程服务器,也就是将当前服务注册到注册中心。源码如下:
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
这里有个小细节,instanceInfoReplicator在启动的时候,会有一定时间的延迟,默认是40秒,也就是client在启动时,不是马上向注册中心注册,而是会延迟40秒再注册。
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); public int getInitialInstanceInfoReplicationIntervalSeconds() { return configInstance.getIntProperty( namespace + INITIAL_REGISTRATION_REPLICATION_DELAY_KEY, 40).get(); }
接着看register方法, 该方法会将实例元数据信息(instanceInfo)通过http方式注册到Eureka服务端。
boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
三、EurekaClient的心跳保持
上一小节已经提到了在启动过程中会初始化心跳定时器
//服务刷新默认30秒,可通过eureka.instance.lease-renewal-interval-in-seconds修改 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
心跳线程,定时向注册中心发送http请求,如果返回404,会重新注册。
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); return register(); } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }
四、总结
在spring-cloud-netflix-eureka-client包下spring.factories可以查看配置入口(EurekaClientAutoConfiguration)
1:项目启动
2:初始化配置EurekaClientAutoConfiguration->eurekaInstanceConfigBean
3:构造EurekaClient对象(内部类EurekaClientAutoConfiguration::RefreshableEurekaClientConfiguration)
3.1:构造心跳任务线程池
3.2:构造缓存刷新任务线程池
4:启动定时任务(心跳+缓存刷新)
4.1:启动缓存刷新定时任务
4.2:启动心跳定时任务
4.3:启动instanceInfoReplicator线程,执行注册任务
5:服务启动时,会延迟40秒向注册中心注册
6:心跳时间默认是30秒,可通过eureka.instance.lease-renewal-interval-in-seconds修改