一、前言
在前面我们分析完了Nacos服务注册、服务发现的原理;当研究完Nacos作为服务配置中心时,是通过定时任务 + 长轮询
的方式实现配置信息的准实时
动态刷新;突发奇想那么服务实例信息变更,作为服务的消费者如何实时感知?所以有了今天这篇文章(以服务实例新增为例)。因为本篇文章和服务注册、服务发现源码紧密结合,大家可以先参考一下下面这两篇文章:
1、图文详述Nacos服务注册源码分析:https://blog.csdn.net/Saintmm/article/details/121981184
2、图文详述Nacos服务发现源码分析:https://blog.csdn.net/Saintmm/article/details/122019300
PS : Nacos Client版本:1.4.2。
二、服务注册、发现、UDP通知交互流程图
三、源码分析
针对服务注册、服务发现的源码此篇文章一带而过,细节大家可以参考上面提到的两篇文章。
1、服务注册
在进行服务注册的时候,会做三件事:增加对服务临时实例和永久实例监听的监听器、添加服务实例时添加CHANGE事件到通知器Notifier、由ApplicationContext事件机制处理CHANGE事件并采用UDP的方式通知Nacos Client(服务消费者)。
>1 增加服务监听器
当一个服务需要向Nacos Server注册自己时,并且之前没有注册过;此时会Nacos Server会创建一个Service,并且添加两个监听器,监听服务临时和永久实例的变更;
逻辑体现在ServiceManager#putServiceAndInit()方法中:
本篇文章我们以临时服务实例为例: 添加服务监听器即以namespaceId、serviceName和ephemeral为key,Service对象为value将键值对添加到DistroConsistencyServiceImpl
类的listeners
字段中。
>2 给通知器添加CHANGE事件
如果服务在Nacos Server中已经存在了,服务注册操作需要给服务添加实例信息,并且将服务实例信息变更事件放入到通知者Notifier的阻塞队列tasks中,以供相应服务的服务消费者(Nacos client)开启UDP推送服务后 将服务实例信息变更事件推送到服务消费者(Nacos client)。
逻辑体现在DistroConsistencyServiceImpl
#onPut()方法中:
1)添加任务实际是将CHANGE类型任务添加到DistroConsistencyServiceImpl
的内部类Notifier
的阻塞队列tasks中;
2)dataStore.put(key, datum)
操作会把服务的所有实例信息维护在DataStore
的dataMap中,以供服务变更事件监听器使用;
>3 采用UDP通知Nacos Client
DistroConsistencyServiceImpl初始化时会初始化notifier,并且在其后置构造方法(@PostConstruct)中会启动notifier任务。
在通知者Notifier的run()方法中会死循环从阻塞队列tasks
中取任务,做相应的事件处理;
处理变更事件时,新的value值从增加服务实例时维护在DataStore
中的dataMap中取;最后进入到Service#onChange()方法中;
updateIPs()方法主要做两件事:
1)会维护服务所在集群的实例信息clusterMap
,这里的集群也可以理解为区域Region上划分的几个机房,比如:南京机房、北京机房;
2)采用UDP的方式通知服务消费者(Nacos Client),消费的服务提供者实例信息发生变更;
调用的UdpPushService#serviceChanged()方法中大有乾坤,开源框架的魅力呀;这其中是我们熟悉的ApplicationContext事件机制,这里进行一个发布事件的操作;
其实吧,看到这里的时候我有点懵逼,于是就复习了一下ApplicationContext的事件机制;见博客:Spring ApplicationContext的事件机制是什么?在Nacos中如何应用?
我们这里再简要说一下ApplicationContext事件机制在Nacos中的应用。
(1)ApplicationContext的事件机制在Nacos中的应用
ServiceChangeEvent
为要发布的事件;UdpPushService
#serviceChanged()
方法中会调用applicationContext.publishEvent()
发布事件;UdpPushService
实现ApplicationListener
接口的onApplicationEvent()
方法处理事件。
总的来说UdpPushService类将ServiceChangeEvent事件通过applicationContext.publishEvent()
发布给自己实现的onApplicationEvent()
方法进行处理,这里是一个典型的观察者设计模式
。
(2)处理ServiceChangeEvent
1> 最后会通过DatagramSocket
发送UDP
请求:
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class UdpPushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
// todo 处理业务逻辑
// If upgrade to 2.0.X, do not push for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
.......
// 发送UDP请求
udpPush(ackEntry);
.......
}
2> 在处理ServiceChangeEvent事件时,有一个问题?从上面看到现在都没有提到Nacos Client(服务消费者),那么serviceChangeEvent应该发到哪里?
对于这个问题我们在下面继续聊。
2、服务发现
在当前流程中,服务发现主要相当于一个导火索,只有做了服务发现,Nacos Server 才知道要推送服务实例变更信息给哪个Nacos Client。
用编程逻辑来说,也就是Nacos Client做服务发现时,需要将自己的IP、Port等信息存储到Nacos Server中,当发生相应服务实例信息变更时,Nacos Server遍历它进行UDP通知。
>1 Nacos Client数据存储
在服务注册,我们最后聊到会在UdpPushService#onApplicationEvent()方法中处理ServiceChangeEvent,它在判断应该往哪些Nacos Client发送服务实例变更通知时,是通过遍历NamingSubscriberServiceV1Impl
的字段clientMap。
我们接着看一下做服务发现时是如何填充的clientMap?
>2 开启UDP推送服务、增加PushClient
在..... 一系列操作之后,具体操作见最开始提到的文章;
1)进入到InstanceOperatorServiceImpl#listInstance()
方法中:如果UDP推送服务可以开启进行Push操作,则会调用subscriberServiceV1.addClient()
增加一个PushClient。
2)subscriberServiceV1.addClient()
中会先初始化一个PushClient,然后将其放入到clientMap中;
对于clientMap而言,整体的交互流程如下图:
3、Nacos Client接收UDP通知
Nacos Server是通过udpSocket.send()方法发送通知的,感觉上Nacos Client应该是通过udpSocket.receive(
方法接收通知,搜一下看看;
我们进入到PushReceiver
类中,PushReceiver是一个线程类,在其run()方法死循环接收UDP通知,当通知类型是service
时,处理本地缓存的服务信息serviceInfoMap;ServiceInfoHolder#processServiceInfo()
方法中处理修改本地缓存相对就很简单了,直接调用Map.put()方法更新相应key的value值即可:
此处的整体流程如下图:
四、总结
- 做服务发现时会做两件事:
*1. 会对服务的永久实例和临时实例各注册一个监听器;
*2. 服务实例增加时触发一个服务实例信息CHANGE事件,由Notifier通知者通过UDP的方式通知Nacos Client(服务消费者);
- 服务注册时则会告诉Notifer通知者应该通知哪些Nacos Client;
- Nacos Client接收到UDP通知之后,更新本地缓存serviceInfoMap。