1EventDispatcher
EventDispatcher在Nacos中是一个事件发布与订阅的类,也就是我们经常使用的Java设计模式——观察者模式
一般发布与订阅主要有三个角色
- 事件: 表示某些类型的事件动作,例如Nacos中的 本地数据发生变更事件
LocalDataChangeEvent
- 事件源 : 事件源可以看成是一个动作,某个事件发生的动作,例如Nacos中本地数据发生了变更,就会通知给所有监听该事件的监听器
- 事件监听器: 事件监听器监听到事件源之后,会执行自己的一些业务处理,监听器必须要有回调方法供事件源回调
一个监听器可以监听多个事件,一个事件也可以被多个监听器监听
那我们看看这个类中的角色
2事件
Event
/**事件定义接口,所有事件继承这个空接口**/ public interface Event { }
LocalDataChangeEvent
/** * 本地数据发生变更的事件。 * @author Nacos */ public class LocalDataChangeEvent implements Event { }
事件监听器
AbstractEventListener
/**抽象事件监听器; 每个监听器需要实现onEvent()处理事件,和interest()将要监听的事件列表**/ static public abstract class AbstractEventListener { public AbstractEventListener() { /*自动注册到*/ EventDispatcher.addEventListener(this); } /**感兴趣的事件列表**/ abstract public List<Class<? extends Event>> interest(); /**处理事件**/ abstract public void onEvent(Event event); }
LongPollingService
/** * 长轮询服务。负责处理 * * @author Nacos */ @Service public class LongPollingService extends AbstractEventListener { @Override public List<Class<? extends Event>> interest() { List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>(); eventTypes.add(LocalDataChangeEvent.class); return eventTypes; } @Override public void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } } }
事件分发类
EventDispatcher
public class EventDispatcher { /**事件与事件监听器的数据中心; 一个事件可以对应着多个监听器**/ static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>(); /** * 新增监听器 */ static public void addEventListener(AbstractEventListener listener) { for (Class<? extends Event> type : listener.interest()) { getEntry(type).listeners.addIfAbsent(listener); } } /** * 事件源调用的接口动作,告知某个事件发生了 */ static public void fireEvent(Event event) { if (null == event) { throw new IllegalArgumentException(); } for (AbstractEventListener listener : getEntry(event.getClass()).listeners) { try { listener.onEvent(event); } catch (Exception e) { log.error(e.toString(), e); } } }
事件源
例如当删除配置文件的时候,就需要触发本地数据变更事件,则需要调用
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
调用了fireEvent之后所有监听这个Event的监听器都将执行listener.onEvent(event);
事件发布与订阅的使用方法有很多,但是基本模式都是一样的---观察者模式; 我们介绍一下其他的用法
3Google Guava 中的EventBus
EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。
EventBucket
我们自定义一个类EventBucket,来初始化及注册一些监听器(订阅者)
@Component public class EventBucket { private static Logger logger = LoggerFactory.getLogger(EventBucket.class); /**事件总线**/ private static EventBus asyncEventBus = new AsyncEventBus("asyncEventBus", Executors.newFixedThreadPool(5)); private static AtomicBoolean isInit = new AtomicBoolean(false); private final List<AsyncListener> asyncListenerList; /**将所有类型为AsyncListener的监听器注入**/ @Autowired public EventBucket(List<AsyncListener> asyncListenerList) { this.asyncListenerList = asyncListenerList; } @PostConstruct public synchronized void init() { //要将所有的事件监听者都在 EventBus中去注册 if (isInit.compareAndSet(false, true)) { asyncListenerList.forEach(a -> asyncEventBus.register(a)); } } /**发送事件**/ public static void post(BaseEvent event) { try { asyncEventBus.post(event); } catch (Throwable e) { logger.error("EventBucket发送事件出错: " + e.getMessage(), e); } } }
BaseEvent
定义BaseEvent 这个类有个post方法,用来发送事件的;所有的**事件必须继承此类
public class BaseEvent { public void post(){ EventBucket.post(this); } }
AsyncListener
定义一个监听器空接口,所有继承此接口的监听器类都将被注册到EventBus中;
public interface AsyncListener { }
上面定义好了基本的类,那我们下面测试怎么使用发布以及订阅 首先订阅一个事件 TestEvent
public class TestEvent extends BaseEvent { private Integer id; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } }
然后定义一个监听器 TestAsyncListener
@Component public class TestAsyncListener implements AsyncListener { @Subscribe public void testEvent(TestEvent testEvent){ System.out.print("我是TestAsyncListener接收到了TestEvent通知"+testEvent.toString()); } @Subscribe public void test2Event(Test2Event test2Event){ System.out.print("我是TestAsyncListener接收到了test2Event通知"+test2Event.toString()); } }
用注解@Subscribe
就可以直接订阅事件了; 那么接下来开始发送一个事件;我们再SpringBoot启动完之后调用一下发送事件通知
@SpringBootApplication public class ClientsApplication { public static void main(String[] args) { SpringApplication.run(ClientsApplication.class, args); TestEvent event = new TestEvent(); event.setId(1); //发布通知 event.post(); } }
启动完成之后,立马打印了
4Spring事件驱动机制
这篇博客写的比较详细可以前往阅读Spring事件驱动机制
5Nacos中使用的监听扩展接口
- [ ] SpringApplicationRunListener
- [ ] ApplicationListener
SpringApplicationRunListener
SpringApplicationRunListener 接口的作用主要就是在Spring Boot 启动初始化的过程中可以通过SpringApplicationRunListener接口回调来让用户在启动的各个流程中可以加入自己的逻辑。 它也是 观察者模式,Spring为我们提供了这个监听器的扩展接口;它监听的就是SpringBoot启动初始化中下面的各个事件
SpringBoot启动过程的关键事件(按照触发顺序)包括:1. 开始启动2. Environment构建完成3. ApplicationContext构建完成4. ApplicationContext完成加载5. ApplicationContext完成刷新并启动6. 启动完成7. 启动失败
package org.springframework.boot; public interface SpringApplicationRunListener { // 在run()方法开始执行时,该方法就立即被调用,可用于在初始化最早期时做一些工作 void starting(); // 当environment构建完成,ApplicationContext创建之前,该方法被调用 void environmentPrepared(ConfigurableEnvironment environment); // 当ApplicationContext构建完成时,该方法被调用 void contextPrepared(ConfigurableApplicationContext context); // 在ApplicationContext完成加载,但没有被刷新前,该方法被调用 void contextLoaded(ConfigurableApplicationContext context); // 在ApplicationContext刷新并启动后,CommandLineRunners和ApplicationRunner未被调用前,该方法被调用 void started(ConfigurableApplicationContext context); // 在run()方法执行完成前该方法被调用 void running(ConfigurableApplicationContext context); // 当应用运行出错时该方法被调用 void failed(ConfigurableApplicationContext context, Throwable exception); }
StartingSpringApplicationRunListener
- 在这个监听类中,主要是做了一些系统属性的设置;如:
nacos.mode=stand alone / cluster nacos.function.mode=All/config/naming nacos.local.ip=InetUtils.getSelfIp()
@Override public void environmentPrepared(ConfigurableEnvironment environment) { if (STANDALONE_MODE) { System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "stand alone"); } else { System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "cluster"); } if (FUNCTION_MODE == null) { System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, "All"); } else if(SystemUtils.FUNCTION_MODE_CONFIG.equals(FUNCTION_MODE)){ System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_CONFIG); } else if(SystemUtils.FUNCTION_MODE_NAMING.equals(FUNCTION_MODE)) { System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_NAMING); } System.setProperty(LOCAL_IP_PROPERTY_KEY, LOCAL_IP); }
- 还有顺便再启动结束之前,每秒中打印一次日志 Nacos is starting...
ApplicationListener
ApplicationListener 就是spring的监听器,能够用来监听事件,典型的观察者模式
@FunctionalInterface public interface ApplicationListener<E extends ApplicationEvent> extends EventListener { /** * Handle an application event. * @param event the event to respond to */ void onApplicationEvent(E event); }
ApplicationEvent
是事件的抽象类; 具体的事件必须继承这个类;
Nacos中
StandaloneProfileApplicationListener
public class StandaloneProfileApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, PriorityOrdered { private static final Logger logger = LoggerFactory.getLogger(StandaloneProfileApplicationListener.class); @Override public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) { ConfigurableEnvironment environment = event.getEnvironment(); if (environment.getProperty(STANDALONE_MODE_PROPERTY_NAME, boolean.class, false)) { environment.addActiveProfile(STANDALONE_SPRING_PROFILE); } if (logger.isInfoEnabled()) { logger.info("Spring Environment's active profiles : {} in standalone mode : {}", Arrays.asList(environment.getActiveProfiles()), STANDALONE_MODE ); } } @Override public int getOrder() { return HIGHEST_PRECEDENCE; } }
这里的监听器的泛型传的ApplicationEnvironmentPreparedEvent
这个事件是SpringBoot内置的事件;
SpringApplication启动并且Environment首次可用于检查和修改时发布的事件,也就是说通过ApplicationEnvironmentPreparedEvent可以拿到Environment的属性;
在这里这个监听器的作用就是拿到 ConfigurableEnvironment
,然后如果是单机模式standalone
就设置一下ActiveProfile
6EnvironmentPostProcessor加载外部配置文件
SpringBoot支持动态的读取文件,留下的扩展接口
org.springframework.boot.env.EnvironmentPostProcessor
。这个接口是spring包下的,使用这个进行配置文件的集中管理,而不需要每个项目都去配置配置文件。
NacosDefaultPropertySourceEnvironmentPostProcessor加载Nacos配置文件
加载这个类是加载core模块下面的META-INF/nacos-default.properties
配置文件的;
public class NacosDefaultPropertySourceEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered { @Override public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { ResourceLoader resourceLoader = getResourceLoader(application); processPropertySource(environment, resourceLoader); } }
7Spring Factories SPI扩展机制
Spring Boot中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照Java中的SPI扩展机制来实现的
简单的总结下java SPI机制的思想。我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制;
在Dubbo中也定义了SPI机制;
在Spring中也有一种类似与Java SPI的加载机制。它在META-INF/spring.factories文件中配置接口的实现类名称,然后在程序中读取这些配置文件并实例化。
这种自定义的SPI机制是Spring Boot Starter实现的基础。
我们看上面说的Nacos中的几个类,并没有打上@Component等等Spring中的注解,没有这些注解那么他们是怎么被加载到Spring容器中被管理的呢?
我们打开文件 core/resources/META-INF/spring.factories
# ApplicationListener org.springframework.context.ApplicationListener=\ com.alibaba.nacos.core.listener.StandaloneProfileApplicationListener # EnvironmentPostProcessor org.springframework.boot.env.EnvironmentPostProcessor=\ com.alibaba.nacos.core.env.NacosDefaultPropertySourceEnvironmentPostProcessor # SpringApplicationRunListener org.springframework.boot.SpringApplicationRunListener=\ com.alibaba.nacos.core.listener.LoggingSpringApplicationRunListener,\ com.alibaba.nacos.core.listener.StartingSpringApplicationRunListener
上面提及到的几个类的全类名都在这个文件中;
- Spring Factories实现原理是什么 spring-core包里定义了SpringFactoriesLoader类,这个类实现了检索META-INF/spring.factories文件,并获取指定接口的配置的功能
具体的实现原理,后面可以再写一篇文章;