@EventPublisher + @Async 异步事件流详解

简介: 本文主要介绍Spring事件流和`@Async`异步线程池处理,以及`@Async`默认线程池可能会导致的问题及解决方法。在@Async注解中value参数使用自定义线程池,能让开发工程师更加明确线程池的运行规则,选取适合的线程策略,规避资源耗尽的风险

本文主要介绍Spring事件流@Async异步线程池处理,以及@Async默认线程池可能会导致的问题及解决方法。

事件流

Spring可以使用以观察者模式实现的事件流操作,将业务逻辑解耦,达到职责分离的效果

Spring事件流的详解

发布事件:

public class EmailService implements ApplicationEventPublisherAware {
   
    private ApplicationEventPublisher publisher;

    public void sendEmail(String address, String content) {
   
         publisher.publishEvent(new BlackListEvent(this, address, content));
        // send email...
    }
}

监听事件:

@EventListener(condition = "#blEvent.content == 'foo'")
public void processBlackListEvent(BlackListEvent blEvent) {
   
    // notify appropriate parties via notificationAddress...
}

注意在默认情况下,事件监听器会同步接收事件。这意味着publishEvent()方法将阻塞,直到所有侦听器都已完成对事件的处理为止。

@Async

@Async注解bean的一个方法,就会让它在一个单独的线程中执行。换句话说,调用者不会等待被调用方法的完成

@Async有两个限制:

  1. 它必须仅应用于public方法
  2. 自调用(从同一个类中调用异步方法)将不起作用

原因:该方法需要为public才可以被代理。而自调用是不生效的,因为它绕过了代理,直接调用了底层方法。

异步返回参数

可以通过将实际返回包装在Future中,将@Async应用于具有返回类型的方法

示例详见How To Do @Async in Spring

@Async
public Future<String> asyncMethodWithReturnType() {
   
    System.out.println("Execute method asynchronously - " 
      + Thread.currentThread().getName());
    try {
   
        Thread.sleep(5000);
        return new AsyncResult<String>("hello world !!!!");
    } catch (InterruptedException e) {
   
        //
    }

    return null;
}

Spring 还提供了一个实现FutureAsyncResult类。我们可以使用它来跟踪异步方法执行的结果。

现在让我们调用上述方法并使用Future对象检索异步过程的结果。

public void testAsyncAnnotationForMethodsWithReturnType()
  throws InterruptedException, ExecutionException {
   
    System.out.println("Invoking an asynchronous method. " 
      + Thread.currentThread().getName());
    Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();

    while (true) {
   
        if (future.isDone()) {
   
            System.out.println("Result from asynchronous process - " + future.get());
            break;
        }
        System.out.println("Continue doing something else. ");
        Thread.sleep(1000);
    }
}

异步监听器

如果要特定的侦听器异步处理事件,只需重用常规@Async支持:

@EventListener
@Async
public void processBlackListEvent(BlackListEvent event) {
   
    // BlackListEvent is processed in a separate thread
}

使用异步事件时,请注意以下限制:

  • 如果事件监听器抛出Exception,它将不会传播给调用者,详见AsyncUncaughtExceptionHandler
  • 此类事件监听器无法发送答复事件。如果您需要发送另一个事件作为处理结果,请注入ApplicationEventPublisher以手动发送事件。

@EventPublisher + @Async 阻塞

@Async注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor线程池。

默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果

spring默认线程池配置参数org.springframework.boot.autoconfigure.task.TaskExecutionProperties

/**
 * Configuration properties for task execution.
 *
 * @author Stephane Nicoll
 * @since 2.1.0
 */
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
   

    private final Pool pool = new Pool();

    /**
     * Prefix to use for the names of newly created threads.
     */
    private String threadNamePrefix = "task-";

    public static class Pool {
   

        /**
         * Queue capacity. An unbounded capacity does not increase the pool and therefore
         * ignores the "max-size" property.
         */
        private int queueCapacity = Integer.MAX_VALUE;

        /**
         * Core number of threads.
         */
        private int coreSize = 8;

        /**
         * Maximum allowed number of threads. If tasks are filling up the queue, the pool
         * can expand up to that size to accommodate the load. Ignored if the queue is
         * unbounded.
         */
        private int maxSize = Integer.MAX_VALUE;

        /**
         * Whether core threads are allowed to time out. This enables dynamic growing and
         * shrinking of the pool.
         */
        private boolean allowCoreThreadTimeout = true;

        /**
         * Time limit for which threads may remain idle before being terminated.
         */
        private Duration keepAlive = Duration.ofSeconds(60);

        //getter/setter
        }
}

自定义线程池

@Async注解中value参数使用自定义线程池,能让开发工程师更加明确线程池的运行规则,选取适合的线程策略,规避资源耗尽的风险

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  1. ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
  2. ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  4. ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
@Configuration
public class ThreadConfig {
   

    @Bean("msgThread")
    public ThreadPoolTaskExecutor getMsgSendTaskExecutor(){
   
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(25);
        taskExecutor.setQueueCapacity(800);
        taskExecutor.setAllowCoreThreadTimeOut(false);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setThreadNamePrefix("msg-thread-");
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

监听事件异步处理

@EventListener(value = MsgEvent.class, condition = "#root.args[0].type == 0")
@Async("msgThread")
public void commonEvent(MsgEvent event) {
   
    //logic
}

@Async使用自定义线程池的其他方式


参考资料:

  1. Spring事件流
  2. @Async优化
  3. How To Do @Async in Spring
  4. Spring使用@Async注解
相关文章
|
Java Spring
RestTemplate上传文件解决方案
当对接文件上传模块时,需要对接上传文件的接口,而我们模块的数据是以字节数组存在的(已经操作过了的字节数组,存在于内存中)接口是以form-data的形式上传的,其中需要上传MultipartFIle,如果使用MultipartFile放入到请求的 fromMap中,然后再上传这个文件,会报(ByteArrayInputStream no serialized)的错误,也就是没有注入对应的bean的错误。。
5201 0
|
Java Spring
运行@Async注解的方法的线程池
自定义@Async注解线程池
499 3
|
存储 Java Spring
@Around 可以获取程序执行后的返回值吗
【8月更文挑战第13天】@Around 可以获取程序执行后的返回值吗
440 2
|
运维 监控 Java
使用jps命令查看Java进程
`jps`是Java开发者和系统管理员的得力助手,它简化了Java进程监控的过程,使得快速检查应用运行状态变得轻而易举。通过合理利用其提供的参数,可以高效地进行故障排查、性能监控及日常管理任务,确保Java应用稳定运行。
1052 2
|
XML Java 测试技术
Graalvm 替代 JVM 真的可以带来巨大的性能优势吗?
介绍 Spring Boot有助于轻松开发独立的、可用于生产的 Spring 应用程序。它对 Spring 平台和第三方库采用固执己见的方法:以最少的配置简化设置过程。优势: 易于使用:Spring Boot 简化了独立 Spring 应用程序的创建,无需复杂的配置。 嵌入式服务器:它允许直接嵌入 Tomcat、Jetty 或 Undertow 等服务器,从而无需单独部署 WAR 文件。 Starter 依赖项:Spring Boot 提供预配置的“starter”依赖项,降低了构建配置的复杂性。 自动配置:Spring Boot 自动配置 Spring 和第三方库,最大限度地减少手动设置工
|
SQL 缓存 监控
MySQL慢查询:慢SQL定位、日志分析与优化方案,真心不错!
MySQL慢查询:慢SQL定位、日志分析与优化方案,真心不错!
MySQL慢查询:慢SQL定位、日志分析与优化方案,真心不错!
|
Java 关系型数据库 MySQL
Spring Boot实现第一次启动时自动初始化数据库
本文以Spring Boot + Mybatis为例,使用MySQL数据库,实现了SSM应用程序第一次启动时自动检测并完成数据库初始化的功能,理论上上述方式适用于所有的关系型数据库,大家稍作修改即可。
1000 0
Spring Boot实现第一次启动时自动初始化数据库
|
SQL 关系型数据库 PostgreSQL
|
数据采集 监控 Java
Spring Boot拦截器:精细化控制请求与响应
本篇详细介绍了在Spring Boot中使用拦截器的方法。拦截器是一种强大的机制,可用于在请求处理前后进行操作,如鉴权、日志记录等。文章涵盖了创建拦截器类、注册拦截器以及实际应用案例。通过具体的代码示例,读者可以了解如何在项目中配置和使用拦截器,以实现各种功能需求。拦截器为Spring Boot应用增加了更多的灵活性和可扩展性,能够提升应用的安全性和可维护性。
3287 0
Spring Boot拦截器:精细化控制请求与响应