本文主要介绍Spring事件流和@Async
异步线程池处理,以及@Async
默认线程池可能会导致的问题及解决方法。
事件流
Spring可以使用以观察者模式实现的事件流操作,将业务逻辑解耦,达到职责分离的效果
发布事件:
public class EmailService implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
public void sendEmail(String address, String content) {
publisher.publishEvent(new BlackListEvent(this, address, content));
// send email...
}
}
监听事件:
@EventListener(condition = "#blEvent.content == 'foo'")
public void processBlackListEvent(BlackListEvent blEvent) {
// notify appropriate parties via notificationAddress...
}
注意在默认情况下,事件监听器会同步接收事件。这意味着
publishEvent()
方法将阻塞,直到所有侦听器都已完成对事件的处理为止。
@Async
用@Async
注解bean
的一个方法,就会让它在一个单独的线程中执行。换句话说,调用者不会等待被调用方法的完成
@Async
有两个限制:
- 它必须仅应用于
public
方法 - 自调用(从同一个类中调用异步方法)将不起作用
原因:该方法需要为
public
才可以被代理。而自调用是不生效的,因为它绕过了代理,直接调用了底层方法。
异步返回参数
可以通过将实际返回包装在Future
中,将@Async
应用于具有返回类型的方法
@Async
public Future<String> asyncMethodWithReturnType() {
System.out.println("Execute method asynchronously - "
+ Thread.currentThread().getName());
try {
Thread.sleep(5000);
return new AsyncResult<String>("hello world !!!!");
} catch (InterruptedException e) {
//
}
return null;
}
Spring 还提供了一个实现Future
的AsyncResult
类。我们可以使用它来跟踪异步方法执行的结果。
现在让我们调用上述方法并使用Future
对象检索异步过程的结果。
public void testAsyncAnnotationForMethodsWithReturnType()
throws InterruptedException, ExecutionException {
System.out.println("Invoking an asynchronous method. "
+ Thread.currentThread().getName());
Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();
while (true) {
if (future.isDone()) {
System.out.println("Result from asynchronous process - " + future.get());
break;
}
System.out.println("Continue doing something else. ");
Thread.sleep(1000);
}
}
异步监听器
如果要特定的侦听器异步处理事件,只需重用常规@Async
支持:
@EventListener
@Async
public void processBlackListEvent(BlackListEvent event) {
// BlackListEvent is processed in a separate thread
}
使用异步事件时,请注意以下限制:
- 如果事件监听器抛出
Exception
,它将不会传播给调用者,详见AsyncUncaughtExceptionHandler
- 此类事件监听器无法发送答复事件。如果您需要发送另一个事件作为处理结果,请注入
ApplicationEventPublisher
以手动发送事件。
@EventPublisher + @Async 阻塞
在@Async
注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor
线程池。
默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果
spring默认线程池配置参数
org.springframework.boot.autoconfigure.task.TaskExecutionProperties
/**
* Configuration properties for task execution.
*
* @author Stephane Nicoll
* @since 2.1.0
*/
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
private final Pool pool = new Pool();
/**
* Prefix to use for the names of newly created threads.
*/
private String threadNamePrefix = "task-";
public static class Pool {
/**
* Queue capacity. An unbounded capacity does not increase the pool and therefore
* ignores the "max-size" property.
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* Core number of threads.
*/
private int coreSize = 8;
/**
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
* can expand up to that size to accommodate the load. Ignored if the queue is
* unbounded.
*/
private int maxSize = Integer.MAX_VALUE;
/**
* Whether core threads are allowed to time out. This enables dynamic growing and
* shrinking of the pool.
*/
private boolean allowCoreThreadTimeout = true;
/**
* Time limit for which threads may remain idle before being terminated.
*/
private Duration keepAlive = Duration.ofSeconds(60);
//getter/setter
}
}
自定义线程池
在@Async
注解中value参数使用自定义线程池,能让开发工程师更加明确线程池的运行规则,选取适合的线程策略,规避资源耗尽的风险
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize
,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
@Configuration
public class ThreadConfig {
@Bean("msgThread")
public ThreadPoolTaskExecutor getMsgSendTaskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(25);
taskExecutor.setQueueCapacity(800);
taskExecutor.setAllowCoreThreadTimeOut(false);
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.setThreadNamePrefix("msg-thread-");
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}
监听事件异步处理
@EventListener(value = MsgEvent.class, condition = "#root.args[0].type == 0")
@Async("msgThread")
public void commonEvent(MsgEvent event) {
//logic
}
参考资料: