一,客户端的心跳机制
在就行完成这个服务注册和这个服务发现之后,因此需要一个心跳机制,来实现这个注册中心和各个微服务之间实现这个长连接。
1,registerInstance实例注册
依旧是在这个NacosNamingService的这个类里面,有这个注册实例的registerInstance方法。在获取这个实例服务之后,如果是临时实例,就会添加一个心跳机制。添加一个服务默认是一个临时实例。
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); //获取实例服务 String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); //如果当前实例是一个临时实例 if (instance.isEphemeral()) { //构建心跳信息,如一些心跳周期等 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName,instance); //添加这个构建的心跳信息 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
1.1, buildBeatInfo方法构建心跳实例信息
里面会有一个构建心跳的一个信息,比如说服务,端口号,默认ip等。其最主要的就是里面有一个Period的一个周期,就是一些心跳的发送间隔,健康检查的间隔等。
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(groupedServiceName); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); //设置周期 beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); return beatInfo; }
在这个周期里面,会对这个心跳做一个基础的默认配置,如下
//心跳间隔的周期 public long getInstanceHeartBeatInterval() { return this.getMetaDataByKeyWithDefault("preserved.heart.beat.interval", Constants.DEFAULT_HEART_BEAT_INTERVAL); } //心跳超时时间 public long getInstanceHeartBeatTimeOut() { return this.getMetaDataByKeyWithDefault("preserved.heart.beat.timeout", Constants.DEFAULT_HEART_BEAT_TIMEOUT); } //服务删除时间 public long getIpDeleteTimeout() { return this.getMetaDataByKeyWithDefault("preserved.ip.delete.timeout", Constants.DEFAULT_IP_DELETE_TIMEOUT); } static { //心跳周期5s DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L); //心跳超时时间,默认15s,超过15s设置为不健康实例 DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L); //实例删除的超时时间,30s没有收到客户端的心跳,就将该客户端注册的实例删除 DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); }
1.2,addBeatInfo添加心跳实例信息
这里面的话主要是有一个延时任务,在这个实例成功注册之后,再过5s才会开始执行这个任务。也就是过了5s之后才开始执行这个任务。
public void addBeatInfo(String sn, BeatInfo bi) { //bi.getPeriod():多久执行一次 executorService.schedule(new BeatTask(bi), bi.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
再来看看这个任务里面的执行流程BeatTask,接下来看这个线程的run方法,里面有几个重要的代码
//会去发送一个服务心跳 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); //然后执行完又会开启一个延迟任务,循环调用 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
在这个sendBeat方法里面,主要有一个这个reqApi方法的操作,可以在这个nacos的文档里面知道这就是一个发送心跳实例
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
二,服务端记录心跳
每个客户端都会向这个web服务端发送心跳,因此服务端这边需要去记录这个心跳发送的时间等,从而来判断实例是否健康,是否需要删除等操作。
由于服务端是一个web服务,那么只要涉及到这个和服务端建立连接和请求,一般是在这个InstanceController类,这个InstanceController类需要看源码才可以找到。可以去hithub里面下载,也可以去我的gitee上面下载.
https://gitee.com/zhenghuisheng/nacos-1.4.2-source-code
这个服务端记录心跳是在这个beat方法里面。
public ObjectNode beat(HttpServletRequest request) throws Exception { //首先会去解析这个beat心跳,并且可以解析是哪个集群,哪个组过来的服务 String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); //注册中心会检查这个实例是否存在 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); //如果实例不存在,就是第一次服务端给客户端发送心跳 if(instance == null){ //那么就会将这个实例注册到这个注册中心里面 serviceManager.registerInstance(namespaceId, serviceName, instance); } //再去注册表中获取这个实例 Service service = serviceManager.getService(namespaceId, serviceName); //心跳健康检查 service.processClientBeat(clientBeat); }
心跳健康检查processClientBeat方法,主要是查看这个实例是否处于这个健康状态
public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); //健康检查的定时任务 HealthCheckReactor.scheduleNow(clientBeatProcessor); }
接下来看看这个ClientBeatProcessor这个线程类,主要是看里面的run方法。主要是会对这些实例进行一个续约的操作,如果一个实例不健康,则会设置成健康状态
public void run(){ //获取全部实例 List<Instance> instances = cluster.allIPs(true); //遍历这些实例 for (Instance instance : instances) { //设置这个实例上一次心跳的发送时间,进行一个续约的操作 instance.setLastBeat(System.currentTimeMillis()); //如果实例不健康,则设置为健康状态 if (!instance.isHealthy()) { instance.setHealthy(true); } } }
在这个scheduleNow方法中,会做一个全局的服务健康检查。相当于是每过15秒,就会进行一个全局的定时任务,判断服务是否处于一个健康实例的状态
GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
三,何时开启这个心跳检查
就是在这个服务注册的时候,会有一个init的初始化方法。在服务进行初始化的时候,会去开启这个客户端的心跳健康检查的这个线程。
public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
接下来进入这个ClientBeatCheckTask的这个线程类里面,主要看他的run方法。和上面的一样,主要是会进行一个心跳是否超过最大时间的判断,是否为健康状态,是否需要删除等操作。
public void run() { //获取全部实例 List<Instance> instances = service.allIPs(true); for (Instance instance : instances) { //会对当前时间和上一次发送心跳时间做一个差,再合这个超时时间作对比 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (instance.isHealthy()) { instance.setHealthy(false); } } //删除 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()){ deleteIp(instance); } } }
四,心跳机制总结
就是在注册这个实例的时候,客户端就会创建一个心跳的实例,一起发送到这个服务端,并且与此同时,会开启一个线程去执行这个客户端给服务端发送心跳的的这个延迟队列线程。客户端注册到这个服务端之后,会开启一个延迟的线程池任务,在注册成功5s之后再发送这个心跳给服务端。服务端在接收到这个客户端的心跳之后,会对这些心跳做一个记录,并且也会开启这个都是任务,去查看这些全部的实例是否需要删除,是否处于健康状态等。