Spring 应用中实现异步
Spring为任务调度与异步方法执行提供了注解支持。通过在方法或类上设置 @Async 注解,可使得方法被异步调用。调用者会在调用时立即返回,而被调用方法的实际执行是交给 Spring 的 TaskExecutor 来完成的。所以被注解的方法被调用的时候,会在新的线程中执行,而调用它的方法会在原线程中执行,这样可以避免阻塞,以及保证任务的实时性。
简单回顾相关配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> 复制代码
@EnableAsync添加配置类
入口类增加了 @EnableAsync 注解,主要是为了扫描范围包下的所有 @Async 注解。
异步调用,通过开启新的线程调用的方法,不影响主线程。异步方法实际的执行交给了 Spring 的 TaskExecutor 来完成。
Future获取异步执行的结果
- 可以发现主调用方法并没有等到调用方法执行完就结束了当前的任务,如果想要知道调用的三个方法全部执行完该怎么办呢,下面就可以用到异步回调。
- 异步回调就是让每个被调用的方法返回一个 Future 类型的值,Spring中提供了一个 Future 接口的子类:AsyncResult,所以我们可以返回 AsyncResult 类型的值。
public class AsyncResult<V> implements ListenableFuture<V> { private final V value; private final ExecutionException executionException; //... } 复制代码
AsyncResult实现了ListenableFuture接口,该对象内部有两个属性:返回值和异常信息。
public interface ListenableFuture<T> extends Future<T> { void addCallback(ListenableFutureCallback<? super T> var1); void addCallback(SuccessCallback<? super T> var1, FailureCallback var2); } 复制代码
ListenableFuture接口继承自Future,在此基础上增加了回调方法的定义。Future 接口定义如下:
public interface Future<V> { // 是否可以打断当前正在执行的任务 boolean cancel(boolean mayInterruptIfRunning); // 任务取消的结果 boolean isCancelled(); // 异步方法中最后返回的那个对象中的值 V get() throws InterruptedException, ExecutionException; // 用来判断该异步任务是否执行完成,如果执行完成,则返回 true,如果未执行完成,则返回false boolean isDone(); // 与 get() 一样,只不过这里参数中设置了超时时间 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 复制代码
- get()方法,在执行的时候是需要等待回调结果的,阻塞等待。如果不设置超时时间,它就阻塞在那里直到有了任务执行完成。我们设置超时时间,就可以在当前任务执行太久的情况下中断当前任务,释放线程,这样就不会导致一直占用资源。
- cancel(boolean) 方法,参数是一个 boolean 类型的值,用来传入是否可以打断当前正在执行的任务。如果参数是 true 且当前任务没有执行完成 ,说明可以打断当前任务,那么就会返回 true;
- 如果当前任务还没有执行,那么不管参数是 true 还是 false,返回值都是 true;
- 如果当前任务已经完成,那么不管参数是 true 还是 false,那么返回值都是 false;
- 如果当前任务没有完成且参数是 false,那么返回值也是 false。
意思就是:
- 如果任务还没执行,那么如果想取消任务,就一定返回 true,与参数无关。
- 如果任务已经执行完成,那么任务一定是不能取消的,所以此时返回值都是false,与参数无关。
- 如果任务正在执行中,那么此时是否取消任务就看参数是否允许打断(true/false)。
获取异步方法返回值的实现
public Future<String> test() throws Exception { log.info("开始做任务"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任务,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("任务完成,耗时" + (end - start) + "毫秒"); } 复制代码
我们将 task 方法的返回值改为 Future,将执行的时间拼接为字符串返回。
@GetMapping("/task") public String taskExecute() { try { Future<String> r1 = taskService.test(); Future<String> r2 = taskService.test(); Future<String> r3 = taskService.test(); while (true) { if (r1.isDone() && r2.isDone() && r3.isDone()) { log.info("execute all tasks"); break; } Thread.sleep(200); } log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get()); } catch (Exception e) { log.error("error executing task for {}",e.getMessage()); } return "ok"; } 复制代码
另一种异步回调结果获取实现
统计一下三个任务并发执行共耗时多少,这就需要等到上述三个函数都完成调动之后记录时间,并计算结果。
也可以使用CompletableFuture来返回异步调用的结果
@Async public CompletableFuture<String> doTaskOne() throws Exception { log.info("开始做任务一"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); log.info("完成任务一,耗时:" + (end - start) + "毫秒"); return CompletableFuture.completedFuture("任务一完成"); } 复制代码
按照如上方式改造一下其他两个异步函数之后,下面我们改造一下测试用例,让测试在等待完成三个异步调用之后来做一些其他事情。
@Test public void test() throws Exception { long start = System.currentTimeMillis(); CompletableFuture<String> task1 = asyncTasks.test(); CompletableFuture<String> task2 = asyncTasks.test(); CompletableFuture<String> task3 = asyncTasks.test(); CompletableFuture.allOf(task1, task2, task3).join(); long end = System.currentTimeMillis(); log.info("任务全部完成,总耗时:" + (end - start) + "毫秒"); } 复制代码
- 在调用三个异步函数的时候,返回CompletableFuture类型的结果对象
- 通过CompletableFuture.allOf(task1, task2, task3).join()实现三个异步任务都结束之前的阻塞效果
- 三个任务都完成之后,根据结束时间 - 开始时间,计算出三个任务并发执行的总耗时。
配置线程池
前面是最简单的使用方法,使用默认的 TaskExecutor。如果想使用自定义的 Executor,可以结合 @Configuration 注解的配置方式,Spring基本上共有五大线程池。
- SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
- SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
- ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
- SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类
- ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装,
public ThreadPoolTaskExecutor FebsShiroThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(5); //配置最大线程数 executor.setMaxPoolSize(20); //配置队列大小 executor.setQueueCapacity(200); //线程池维护线程所允许的空闲时间 executor.setKeepAliveSeconds(30); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(ConstantFiledUtil.KMALL_THREAD_NAME_PREFIX); //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean executor.setWaitForTasksToCompleteOnShutdown(true); //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住 executor.setAwaitTerminationSeconds(60); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } 复制代码
线程池的配置很灵活,对核心线程数、最大线程数等属性进行配置。其中,rejection-policy,当线程池已经达到最大线程数的时候,如何处理新任务。可选策略有 CallerBlocksPolicy、CallerRunsPolicy 等。CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行。我们验证下,线程池的设置是否生效,在 TaskService 中,打印当前的线程名称:
public Future<String> doExecute() throws Exception { log.info("开始做任务一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任务一,耗时:" + (end - start) + "毫秒"); log.info("当前线程为 {}", Thread.currentThread().getName()); return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒"); } 复制代码
在 Spring @Async 异步线程使用过程中,需要注意的是以下的用法会使 @Async 失效:
- 异步方法使用 static 修饰;
- 异步类没有使用 @Component 注解(或其他注解)导致 Spring 无法扫描到异步类;
- 异步方法不能与被调用的异步方法在同一个类中;
- 类中需要使用 @Autowired 或 @Resource 等注解自动注入,不能手动 new 对象;
- 如果使用 Spring Boot 框架必须在启动类中增加 @EnableAsync 注解。
线程上下文信息传递
微服务架构中的一次请求会涉及多个微服务。或者一个服务中会有多个处理方法,这些方法有可能是异步方法。有些线程上下文信息,如请求的路径,用户唯一的 userId,这些信息会一直在请求中传递。如果不做任何处理,我们看下是否能够正常获取这些信息。
如果理由RequestContextHolder 中的请求信息时,报了空指针异常。这说明了请求的上下文信息未传递到异步方法的线程中。RequestContextHolder 的实现,里面有两个 ThreadLocal 保存当前线程下的 request
//得到存储进去的request private static final ThreadLocal<RequestAttributes> requestAttributesHolder = new NamedThreadLocal<RequestAttributes>("Request attributes"); //可被子线程继承的request private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder = new NamedInheritableThreadLocal<RequestAttributes>("Request context"); 复制代码
如何将上下文信息传递到异步线程呢?
Spring 中的 ThreadPoolTaskExecutor 有一个配置属性 TaskDecorator,TaskDecorator 是一个回调接口,采用装饰器模式。
装饰模式是动态的给一个对象添加一些额外的功能,就增加功能来说,装饰模式比生成子类更为灵活。因此 TaskDecorator 主要用于任务的调用时设置一些执行上下文,或者为任务执行提供一些监视/统计。
public interface TaskDecorator { Runnable decorate(Runnable runnable); } 复制代码
decorate 方法,装饰给定的 Runnable,返回包装的 Runnable 以供实际执行。
下面我们定义一个线程上下文拷贝的TaskDecorator。
public class ThreadLocalDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { RequestAttributes context = RequestContextHolder.currentRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(context); runnable.run(); } finally { RequestContextHolder.resetRequestAttributes(); } }; } } 复制代码
实现较为简单,将当前线程的context装饰到指定的Runnable,最后重置当前线程,上下文在线程池的配置中,增加回调的 TaskDecorator 属性的配置:
@Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskExecutor-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); // 增加 TaskDecorator 属性的配置 executor.setTaskDecorator(new ThreadLocalDecorator()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } 复制代码
异步方法返回类型只能有两种:
当返回类型为void的时候,方法调用过程产生的异常不会抛到调用者层面,可以通过注AsyncUncaughtExceptionHandler来捕获此类异常
当返回类型为Future的时候,方法调用过程产生的异常会抛到调用者层面
注意:如果不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor。
SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用会创建一个新的线程。并发大的时候会产生严重的性能问题。