DefaultPublisher 事件发布者
public class DefaultPublisher extends Thread implements EventPublisher • 1
默认发布者的源码,查看以后可以发现它继承自 Thread,也就是它是一个线程类;同时,它又实现了 EventPublisher,也就是发布者
@Override public void init(Class<? extends Event> type, int bufferSize) { // 守护线程 setDaemon(true); // 设置线程名称 setName("nacos.publisher-" + type.getName()); this.eventType = type; this.queueMaxSize = bufferSize; // 阻塞队列初始化 this.queue = new ArrayBlockingQueue<>(bufferSize); start(); } @Override public synchronized void start() { if (!initialized) { // start just called once super.start(); if (queueMaxSize == -1) { queueMaxSize = ringBufferSize; } initialized = true; } }
看它的 init 初始化方法,从这里可以看出当 DefaultPublisher 被初始化时,是以守护线程的方式进行运作的,其中还初始化了一个阻塞队列,最后调用了 start 方法:在这其中调用了 super.start 方法启动了线程,接着看 run 方法的运行逻辑
@Override public void run() { openEventHandler(); } void openEventHandler() { try { // This variable is defined to resolve the problem which message overstock in the queue. int waitTimes = 60; // To ensure that messages are not lost, enable EventHandler when // waiting for the first Subscriber to register // 死循环延迟,线程启动最大延时 60 秒,这个主要是为了解决消息积压问题 for (; ; ) { if (shutdown || hasSubscriber() || waitTimes <= 0) { break; } ThreadUtils.sleep(1000L); waitTimes--; } // 死循环不断的从队列中取出 Event,并通知订阅者 Subscriber 执行 Event for (; ; ) { if (shutdown) { break; } // 从队列中取出 Event final Event event = queue.take(); receiveEvent(event); UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); } } catch (Throwable ex) { LOGGER.error("Event listener exception : ", ex); } }
run 方法中调用 openEventHandler 方法,这里面写了两个死循环
- 第一个循环:可以理解为延时效果,也就是说线程启动时最大延时为 60 秒,在这 60 秒中每间隔一秒会判断「当前线程是否关闭、是否有订阅者、是否等待超过 60 秒」如果满足其中一个条件,就可以提前退出当前死循环
- 第二个循环:真正处理业务逻辑的,会从阻塞队列中取出一个事件,然后通过 receiveEvent 方法执行
队列中的事件从何而来?
@Override public boolean publish(Event event) { checkIsStart(); boolean success = this.queue.offer(event); if (!success) { LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); receiveEvent(event); return true; } return true; }
其实就是 DefaultPublisher#publish 发布事件方法被调用了,往阻塞队列中存入事件,如果存入失败,会直接调用 receiveEvent「直接理解为如果存入队列失败,则立即执行,不走队列了」
receiveEvent 方法
// 通知和通知订阅者进行事件处理 void receiveEvent(Event event) { if (!hasSubscriber()) { LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event); return; } // 通知订阅者执行 Event for (Subscriber subscriber : subscribers) { if (!subscriber.scopeMatches(event)) { continue; } // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass()); continue; } // Because unifying smartSubscriber and subscriber, so here need to think of compatibility. // Remove original judge part of codes. notifySubscriber(subscriber, event); } }
在这里就是遍历 subscribers(订阅者集合),然后通知订阅者执行事件,那么 subscribers 订阅者是从何而为的呢?这个还是要回到 NacosNamingService#init 方法中
// NacosNamingService#init // 将 Subsribe 注册到 Publisher 中 NotifyCenter.registerSubscriber(changeNotifier); // NotifyCenter#addSubscriber // NotifyCenter#registerSubscriber->NotifyCenter#addSubscriber private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType, EventPublisherFactory factory) { final String topic = ClassUtils.getCanonicalName(subscribeType); synchronized (NotifyCenter.class) { // MapUtils.computeIfAbsent is a unsafe method. MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize); } // 获取事件对应的发布者 EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher instanceof ShardedEventPublisher) { ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType); } else { // 添加到对应的 subscribers 集合 publisher.addSubscriber(consumer); } }
registerSubscriber 方法最终会调用 NotifyCenter#addSubscriber 方法「核心逻辑:将订阅的事件、发布者、订阅者三者关系进行绑定,发布者与事件通过 Map 进行维护,发布者与订阅者通过关联关系进行维护」
// DefaultPublisher.java @Override public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); // 执行订阅事件 final Runnable job = () -> subscriber.onEvent(event); // 执行者,该订阅是否指定了线程池:如果有则拉起一个线程进行异步执行事件 final Executor executor = subscriber.executor(); if (executor != null) { executor.execute(job); } else { try { job.run(); } catch (Throwable e) { LOGGER.error("Event callback exception: ", e); } } }
总结
整体服务订阅的事件机制还是比较复杂的,因为用到了事件的形式,逻辑比较绕,并且其中还有守护线程、死循环、阻塞队列等;重点理解的是 NotifyCenter 对事件发布者、事件订阅者和事件之间关系的维护,而这一关系维护的入口就位于 NacosNamingService#init 方法当中,以下是核心流程:事件发布、事件处理
流程图如下分析:
ServiceInfoHolder 中通过 NotifyCenter 发布了 InstancesChangeEvent 事件
NotifyCenter 进行发布事件,核心逻辑如下:
- 通过 InstancesChangeEvent 事件类型,获取对应的 CanonicalName
- 把 CanonicalName 作为 Key,从 NotifyCenter#publisherMap 中获取到对应的事件发布者(EventPublisher)
- EventPublisher 将 InstancesChangeEvent 事件进行发布
InstancesChangeEvent 事件发布处理逻辑:
- 通过 EventPublisher 默认实现类 DefaultPublisher 进行 InstancesChangeEvent 事件发布
- DefaultPublisher 本身以守护线程的方式运行,在执行业务逻辑前,先判断该线程是否启动
- 如果启动,则将事件添加到 BlockingQueue 中,队列默认大小:16384
- 如果未启动,直接抛出异常结束
- 添加到队列失败,则直接调用 DefaultPublisher#receiveEvent 方法,接收事件并通知订阅者进行处理
- 通过订阅者创建一个 Runnable 对象,执行订阅者 Event,Event 事件便是执行订阅时传入的
添加到队列成功,则进入到事件的处理阶段
- DefaultPublisher 初始化时会创建一个阻塞(BlockingQueue)队列,并标记线程启动
- DefaultPublisher 本身继承于 Thread,当执行 super#start 方法时,会调用它的 run 方法
- run 方法核心业务逻辑是通过 openEventHandler 方法进行处理的
- openEventHandler 方法通过两个 for 循环,从阻塞队列中获取时间信息
- one.用于让线程启动时在 60s 内检查执行条件,如果条件满足则立即执行第二个死循环
- two.循环为死循环,从阻塞队列中获取 Event,并调用 DefaultPublisher#receiveEvent 方法,接收事件并通知订阅者进行处理
结尾
至此「Nacos 服务端注册、客户端服务发现源码分析」分析到这里,基本只需要掌握大致的脉路即可
欢迎大家在评论框分享您的看法,喜欢该文章帮忙给个赞👍和收藏,感谢!!
分享个人学习源码的几部曲
- 设计模式掌握为前提,程序员的内功修炼法,🙅不分语言
- 不要太追究于细节,捋清大致脉路即可;太过于追究于细节,你会越捋越乱
- 关注重要的类和方法、核心逻辑
- 掌握 Debug 技巧,在关键的类和方法多停留,多作分析和记录
更多技术文章可以查看:vnjohn 个人博客