Spring异步事件机制剖析

简介: Spring异步事件机制剖析

同步&异步


  • 同步事件
    在一个线程里,所有的业务方法都是顺序执行的,存在上下依赖关系,其中一个环节耗时过长或阻塞会影响后续环节,且整体耗时增加也受到影响。
    网络异常,图片无法展示
    |
  • 异步事件
    在一个线程里,执行一个业务方法或逻辑,其他业务方法或逻辑通过异步线程进行并行执行,彼此独立不影响,可以充分利用多线程的优势提高并发,减少整体耗时。
    网络异常,图片无法展示
    |


实现原理


交互流程


网络异常,图片无法展示
|


  • publisher 事件发布器,这里是事件对象的发布入口
  • listener 事件监听器,这里是事件对象处理的最终对象
  • event 事件对象,事件数据的载体
  • applicationEventMultiCaster 事件多播器,它是连接发布器与监听器的桥梁和中转路由,负责将事件对象分发到具体的监听器上去


Spring中的事件机制是通过观察者模式来进行实现的,支持同步/异步两种方式。


Spring的事件机制提供了一种可以实现业务解耦的优雅编程方式,将实现剥离,使得实现细节更加具体和聚焦,在一定程度上便于后续维护,扩展性较好。


一般来说,我们使用异步方式进行,否则便和日常的同步调用没有太大区别,无法充分发挥异步线程带来的并行优势。

特点

作用

观察者模式

解耦

异步

并行化


初始化


关于Spring事件机制的初始化,其实主要是基于观察者模式进行发布器、监听器路由关系的绑定和建立,事件多播器也是依赖映射关系进行事件对象的分发实现具体监听器的处理。


Spring事件机制只是整个Spring环境中的一个组成部分,这里需要前置了解Spring环境的初始化方式和工作原理后再结合剖析事件机制的初始化就不难理解了。我们知道AbstractApplicationContext是Spring中环境的抽象基类,它的refresh()方法涵盖了所有整个Spring初始化流程,这里面就包含多播器的初始化方法initApplicationEventMulticaster()和监听器的绑定方法registerListeners()


public void refresh() throws BeansException, IllegalStateException {

       //。。。

    //省略其他

       synchronized(this.startupShutdownMonitor) {

           //。。。

           //省略其他

           try {

               //。。。

               //省略其他

               this.initApplicationEventMulticaster();

               this.registerListeners();

               //省略其他

               //。。。

           } catch (BeansException var9) {

               //。。。

               //省略其他

           } finally {

               //。。。

               //省略其他

           }

       }

   }


下面来看下多播器的初始化方法initApplicationEventMulticaster(),主要是从BeanFactory中判断是否已创建多播器,如果没有创建则创建默认的SimpleApplicationEventMulticaster作为多播器。


protected void initApplicationEventMulticaster() {

// 拿到当前BeanFactory,通过Bean工厂来获取多播器的Bean实例

       ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();

       // 如果存在多播器,则直接获取

       if (beanFactory.containsLocalBean("applicationEventMulticaster")) {

           this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);

           if (this.logger.isDebugEnabled()) {

               this.logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");

           }

       }

// 如果不存在多播器,则直接创建默认的SimpleApplicationEventMulticaster多播器进行Bean注册

else {

           this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);

           beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster);

           if (this.logger.isDebugEnabled()) {

               this.logger.debug("Unable to locate ApplicationEventMulticaster with name 'applicationEventMulticaster': using default [" + this.applicationEventMulticaster + "]");

           }

       }

   }


监听器的注册方法registerListeners()


protected void registerListeners() {

// 获取所有ApplicationListener的迭代器

       Iterator var1 = this.getApplicationListeners().iterator();

// 把所有监听器都注册到当前的多播器上去

       while(var1.hasNext()) {

           ApplicationListener<?> listener = (ApplicationListener)var1.next();

           this.getApplicationEventMulticaster().addApplicationListener(listener);

       }


       String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true, false);

       String[] var7 = listenerBeanNames;

       int var3 = listenerBeanNames.length;


       for(int var4 = 0; var4 < var3; ++var4) {

           String listenerBeanName = var7[var4];

           this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);

       }

// 如果是早期事件,则直接遍历执行发布事件对象

       Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;

       this.earlyApplicationEvents = null;

       if (earlyEventsToProcess != null) {

           Iterator var9 = earlyEventsToProcess.iterator();


           while(var9.hasNext()) {

               ApplicationEvent earlyEvent = (ApplicationEvent)var9.next();

               this.getApplicationEventMulticaster().multicastEvent(earlyEvent);

           }

       }

   }


发布事件


发布事件是通过AbstractApplicationContextpublishEvent()方法发布的


public void publishEvent(Object event) {

publishEvent(event, null);

}


protected void publishEvent(Object event, @Nullable ResolvableType eventType) {

Assert.notNull(event, "Event must not be null");


// 判断事件类型是否为ApplicationEvent,如果不是则封装成PayloadApplicationEvent

ApplicationEvent applicationEvent;

if (event instanceof ApplicationEvent) {

applicationEvent = (ApplicationEvent) event;

}

else {

applicationEvent = new PayloadApplicationEvent<>(this, event);

if (eventType == null) {

  eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();

}

}


// 如果是Spring初始化的早期事件,则需要加入到早期事件中立即发布

if (this.earlyApplicationEvents != null) {

this.earlyApplicationEvents.add(applicationEvent);

}

else {

//如果不是早期事件,则通过多播器立即进行事件发布

getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);

}

 

//如果父类上下文存在,进行发布事件

if (this.parent != null) {

if (this.parent instanceof AbstractApplicationContext) {

  ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);

}

else {

  this.parent.publishEvent(event);

}

}

}


然后再来看下SimpleApplicationEventMulticastermulticastEvent()方法,这是多播器广播事件的方法


public void multicastEvent(ApplicationEvent event) {

//解析Event类型,进行事件发布

multicastEvent(event, resolveDefaultEventType(event));

}


public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {

// 获取到事件对应的解析类型

ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));

// 获取到多播器当前的线程池

Executor executor = getTaskExecutor();

// 获取当前应用中与给定事件类型匹配的ApplicationListeners的监听器集合,不符合的监听器会被排除在外。再循环执行

for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {

// 如果线程池不为空,则通过线程池异步执行

if (executor != null) {

  executor.execute(() -> invokeListener(listener, event));

}

else {

  // 否则由当前线程执行

  invokeListener(listener, event);

}

}

}


事件处理


事件处理由invokeListener()方法进行处理,该方法做了一层try、catch封装,实际方法在doInvokeListener()方法中的listener.onApplicationEvent(event)


protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {

ErrorHandler errorHandler = getErrorHandler();

if (errorHandler != null) {

try {

  doInvokeListener(listener, event);

}

catch (Throwable err) {

  errorHandler.handleError(err);

}

}

else {

doInvokeListener(listener, event);

}

}


@SuppressWarnings({"rawtypes", "unchecked"})

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {

try {

// 实际监听器接受该事件并处理

listener.onApplicationEvent(event);

}

catch (ClassCastException ex) {

String msg = ex.getMessage();

if (msg == null || matchesClassCastMessage(msg, event.getClass())) {

  Log logger = LogFactory.getLog(getClass());

  if (logger.isTraceEnabled()) {

  logger.trace("Non-matching event type for listener: " + listener, ex);

  }

}

else {

  throw ex;

}

}

}


关系匹配


当发布器发布事件对象后,会通过getApplicationListeners方法进行监听器的获取,返回的监听器集合进行遍历和类型解析,匹配到符合条件的监听器。


protected Collection<ApplicationListener<?>> getApplicationListeners(

ApplicationEvent event, ResolvableType eventType) {

// 最初发送事件的对象

Object source = event.getSource();

Class<?> sourceType = (source != null ? source.getClass() : null);

// 将给定事件类型与源类型进行封装

ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);

// Quick check for existing entry on ConcurrentHashMap...

ListenerRetriever retriever = this.retrieverCache.get(cacheKey);

if (retriever != null) {

return retriever.getApplicationListeners();

}


if (this.beanClassLoader == null ||

  (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&

    (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {

// 完全同步构建和缓存ListenerRetriever

synchronized (this.retrievalMutex) {

  retriever = this.retrieverCache.get(cacheKey);

  if (retriever != null) {

  return retriever.getApplicationListeners();

  }

  retriever = new ListenerRetriever(true);

  //实际检索给定事件和源类型的应用程序监听器,将过滤后的监听器集合进行返回

  Collection<ApplicationListener<?>> listeners =

    retrieveApplicationListeners(eventType, sourceType, retriever);

  this.retrieverCache.put(cacheKey, retriever);

  return listeners;

}

}

else {

// No ListenerRetriever caching -> no synchronization necessary

return retrieveApplicationListeners(eventType, sourceType, null);

}

}


通过retrieveApplicationListeners()方法,进行实际检索给定事件和源类型的应用程序监听器的实现。通过supportsEvent()方法进行判断监听器是否支持给定事件。


private Collection<ApplicationListener<?>> retrieveApplicationListeners(

ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {


List<ApplicationListener<?>> allListeners = new ArrayList<>();

//当前应用中的监听器集合

Set<ApplicationListener<?>> listeners;

//当前应用中的监听器BeanName集合

Set<String> listenerBeans;

synchronized (this.retrievalMutex) {

listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);

listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);

}

for (ApplicationListener<?> listener : listeners) {

// 判断监听器是否支持给定事件

if (supportsEvent(listener, eventType, sourceType)) {

  if (retriever != null) {

  retriever.applicationListeners.add(listener);

  }

  allListeners.add(listener);

}

}

if (!listenerBeans.isEmpty()) {

BeanFactory beanFactory = getBeanFactory();

for (String listenerBeanName : listenerBeans) {

  try {

  //根据监听器的BeanName获取到对应类型

  Class<?> listenerType = beanFactory.getType(listenerBeanName);

  if (listenerType == null || supportsEvent(listenerType, eventType)) {

    ApplicationListener<?> listener =

      beanFactory.getBean(listenerBeanName, ApplicationListener.class);

    //判断监听器是否支持给定事件

    if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {

    if (retriever != null) {

      if (beanFactory.isSingleton(listenerBeanName)) {

      retriever.applicationListeners.add(listener);

      }

      else {

      retriever.applicationListenerBeans.add(listenerBeanName);

      }

    }

    allListeners.add(listener);

    }

  }

  }

  catch (NoSuchBeanDefinitionException ex) {

  // Singleton listener instance (without backing bean definition) disappeared -

  // probably in the middle of the destruction phase

  }

}

}

AnnotationAwareOrderComparator.sort(allListeners);

if (retriever != null && retriever.applicationListenerBeans.isEmpty()) {

retriever.applicationListeners.clear();

retriever.applicationListeners.addAll(allListeners);

}

return allListeners;

}


配置方式


开启异步


  • 注解方式


/**

* created by guanjian on 2021/4/7 9:36

*/

@Configuration

@EnableAsync

public class AsyncConfig{

// 这里还可以配置其他更多可选项,如异步线程池等

}


  • XML方式


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"

      xmlns:aop="http://www.springframework.org/schema/aop"

      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/taskhttp://www.springframework.org/schema/task/spring-task.xsdhttp://www.springframework.org/schema/aophttps://www.springframework.org/schema/aop/spring-aop.xsd">


   <!--开启注解调度支持 等同于@Async注解 这里executor是指定线程池,有下面两种异步线程池的配置方式 -->

   <task:annotation-driven />


</beans>


<task:annotation-driven "/>是开启异步,等同于@Async注解的作用。


多播器定义


<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">

       <property name="taskExecutor" ref="executor" />

</bean>


如果这里不定义也可以,会在初始化阶段创建SimpleApplicationEventMulticaster来作为默认的多播器。


异步线程定义


  • 注解方式


/**

* created by guanjian on 2021/4/7 9:36

*/

@Configuration

@EnableAsync

public class AsyncConfig implements AsyncConfigurer {

   /**

    * 这里手动注入异步线程池

    */

   @Override

   public Executor getAsyncExecutor() {

       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

       //核心线程池数量

       executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());

       //最大线程数量

       executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 5);

       //线程池的队列容量

       executor.setQueueCapacity(Runtime.getRuntime().availableProcessors() * 2);

       //线程名称的前缀

       executor.setThreadNamePrefix("async-executor-");

       executor.initialize();

       return executor;

   }


   /**

    * 这里注入异步未知错误捕获处理

    */

   @Override

   public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {

       return new SimpleAsyncUncaughtExceptionHandler();

   }

}


实现AsyncConfigurer接口完成异步调用的相关配置,getAsyncExecutor是配置异步线程池,getAsyncUncaughtExceptionHandler是配置异步未知错误捕获处理。


  • XML方式


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"

      xmlns:aop="http://www.springframework.org/schema/aop"

      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/taskhttp://www.springframework.org/schema/task/spring-task.xsdhttp://www.springframework.org/schema/aophttps://www.springframework.org/schema/aop/spring-aop.xsd">


   <!-- 开启@AspectJ AOP代理 -->

   <aop:aspectj-autoproxy proxy-target-class="true" expose-proxy="true"/>

   <!--开启注解调度支持 等同于@Async注解 这里executor是指定线程池,有下面两种异步线程池的配置方式 -->

   <task:annotation-driven executor="executor" proxy-target-class="true"/>


   <!-- 配置方式1:异步线程池 -->

   <task:executor id="executor" pool-size="10" keep-alive="3000" queue-capacity="200" rejection-policy="CALLER_RUNS"/>

   <!-- 配置方式2:异步线程池 -->

   <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

       <property name="corePoolSize" value="5"/>

       <property name="keepAliveSeconds" value="3000"/>

       <property name="maxPoolSize" value="50"/>

       <property name="queueCapacity" value="200"/>

   </bean>

</beans>


<task:annotation-driven executor=“executor” proxy-target-class=“true”/>是开启异步,等同于@Async注解的作用,executor属性用来指定异步线程池,上面的示例中配置了两种异步线程池,指定哪一个都可以。
使用<task:executor />这种,配置风格一致更清晰,它是线程池在异步事件机制中的特殊schema支持,参数有限。
使用标准的Bean对org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor进行配置的话,可以支持最全面属性配置。


关于异步线程池的配置,可以通过代码分析看细节,这里先说结论。


  • 在开启了异步的情况下,没有配置、且没有指定任何线程池则自动调用内部实现SimpleAsyncTaskExecutor线程池进行异步执行
  • 如果配置了org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor线程池,即时没指定也会默认以此线程池进行异步执行
  • 如果指定了异步线程池,则按照指定的异步线程池进行异步执行,与其他线程池隔离,互不干扰


事件定义


/**

* created by guanjian on 2021/4/6 17:46

*/

public class EventObject extends ApplicationEvent {


   public EventObject(Object source) {

       super(source);

   }

}


继承ApplicationEvent类进行事件对象的扩展,该对象用来事件对象数据的传递,由publisher发布对象事件,由指定的listener来进行监听接收。


事件发布


/**

* created by guanjian on 2021/4/6 17:40

*/

@Component

public class AsyncPublisher implements ApplicationEventPublisherAware {


   private final static Logger LOGGER = LoggerFactory.getLogger(AsyncPublisher.class);


   @Resource

   private ApplicationEventPublisher publisher;


   @Override

   public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {

       this.publisher = applicationEventPublisher;

   }


   public void publish(EventObject eventObject) {

       LOGGER.info("Thread={} 发布事件 source={}",Thread.currentThread().getName(), eventObject.getSource());

       this.publisher.publishEvent(eventObject);

   }

}


实现ApplicationEventPublisherAware接口的方法,将ApplicationEventPublisher注入并通过它来调用publishEvent()方法进行事件发布。


事件监听


/**

* created by guanjian on 2021/4/6 17:42

*/

@Component

public class AsyncListener implements ApplicationListener<EventObject> {


   private final static Logger LOGGER = LoggerFactory.getLogger(AsyncListener.class);


   @Async

   @Override

   public void onApplicationEvent(EventObject eventObject) {

       LOGGER.info("Thread={} 监听事件 source={}", Thread.currentThread().getName(), eventObject.getSource());

   }

}


@Async注解是开启方法异步执行,不开启则是同步执行


注意: 无论是publisher还是listener都要纳入spring的管理才可以生效


失效原因


  • 没有通过@EnableAsync注解开启异步支持,或没有通过xml配置<task:annotation-driven />开启异步支持
  • 监听方法没有加@Async注解
  • 发布器(publisher)、监听器(listener)都要参与到Spring的管理中来,检查是否忘记@Component注解标记


实战举例


基于MQ消息实现的,基于用户信息变化事件的异步分发


关系拓扑


网络异常,图片无法展示
|


目录结构


│ MqEvent.java // 事件发布对象
│ MqEventEnums.java // 事件枚举
│ MqEventListener.java // 事件监听器
│ MqEventPublisher.java // 事件发布器

├─handler
│ UserInfoInvalidEventHandler.java // 失效事件处理器
│ UserInfoModifyEventHandler.java // 修改事件处理器
│ UserInfoValidEventHandler.java // 生效事件处理器

├─holder
│ MqEventHolder.java // 事件枚举与事件处理器关系容器

└─vo
UserInfoInValidEventVo.java // 失效事件对象
UserInfoModifyEventVo.java // 修改事件对象
UserInfoValidEventVo.java // 生效事件对象


优点&缺点


优点


  • 业务解耦
  • 异步化处理,减少同步执行阻塞


缺点


  • 知识盲区多,需要深刻理解实现机制和原理,结合Spring配置进行使用,如果忽略底层实现不但起不到作用,反而适得其反
  • 由于是异步线程并行处理,不适合前后依赖的业务逻辑,也不适合做事务特性操作的逻辑处理


扩展

CompletableFuture


参考

Spring的事件机制详解
Spring之事件机制详解
Spring异步编程 | 你的@Async就真的异步吗?异步历险奇遇记

相关文章
|
2月前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
前端开发 Java API
异步编程 - 11 Spring WebFlux的异步非阻塞处理2
异步编程 - 11 Spring WebFlux的异步非阻塞处理2
174 0
|
7月前
|
JavaScript Java API
spring boot使用异步多线程
一文讲清楚spring boot如何结合异步多线程实现文件的导出这类耗时间的操作优化以及常用的场景,了解异步思想
141 0
spring boot使用异步多线程
|
前端开发 Java API
异步编程 - 11 Spring WebFlux的异步非阻塞处理
异步编程 - 11 Spring WebFlux的异步非阻塞处理
485 0
|
消息中间件 Java Spring
Spring事件监听机制使用和原理解析
今天来分享一下Spring的事件监听机制,之前分享过一篇Spring监听机制的使用,今天从原理上进行解析,Spring的监听机制基于观察者模式,就是就是我们所说的发布订阅模式,这种模式可以在一定程度上实现代码的解耦,如果想要实现系统层面的解耦,那么消息队列就是我们的不二选择,消息队列本身也是发布订阅模式,只是不同的消息队列的实现方式不一样。
109 0
|
Java Spring 容器
Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析
Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析
126 0
|
Java 程序员 容器
Spring5源码 - 10 Spring事件监听机制_应用篇
Spring5源码 - 10 Spring事件监听机制_应用篇
233 0
|
Java 测试技术 Spring
Spring boot如何实现异步调用
Spring boot如何实现异步调用
|
XML 缓存 Java
Spring异步事件机制剖析
Spring异步事件机制剖析
224 0
|
设计模式 数据可视化 Java
Spring 事件处理机制详解,带你吃透 Spring 事件
前言 Spring 事件处理基于 Java 观察者模式扩展。Spring 应用上下文中发布了各种事件,此外 Spring 还允许我们发送和处理自定义的事件,本篇将对 Spring 的事件机制使用及其实现进行详细介绍。
1336 0
Spring 事件处理机制详解,带你吃透 Spring 事件