概述
在Spring Framework中分别使用TaskExecutor
和TaskScheduler
接口提供异步执行和任务调度的抽象。
这里我们着重了解基于TaskExecutor支撑的注解@Async是如何实现异步处理的。
Spring中对TaskExecutor的抽象
Spring 2.0版本中提供了一种新的处理执行器(executors)的抽象,即TaskExecutor接口。TaskExecutor接口 与java.util.concurrent.Executor是等价的,其只有一个接口。
public interface TaskExecutor { void execute(Runnable task); }
该接口具有单个方法execute(Runnable task),该方法基于线程池的语义和配置接收要执行的任务。
Spring框架内置的TaskExecutor实现。
SimpleAsyncTaskExecutor
这种TaskExecutor接口的实现不会复用线程,对应每个请求会新创建一个对应的线程来执行。它支持的并发限制将阻止任何超出限制的调用,这个可以通过调用setConcurrencyLimit方法来限制并发数,默认是不限制并发数的。
SyncTaskExecutor
这种TaskExecutor接口的实现不会异步地执行提交的任务,而是会同步使用调用线程来执行,这种实现主要用于没有必要多线程进行处理的情况,比如在进行简单的单元测试时。
ConcurrentTaskExecutor
这种TaskExecutor接口的实现是对JDK5中的java.util.concurrent.Executor的一个包装,通过setConcurrentExecutor(Executor concurrentExecutor)接口可以设置一个JUC中的线程池到其内部来做适配。
还有一个替代方案ThreadPoolTaskExecutor,它通过bean属性的方式配置Executor线程池的属性。一般很少会用到Concurrent TaskExecutor,但如果ThreadPoolTaskExecutor不够健壮满足不了你的需求,那么ConcurrentTaskExecutor也是一种选择。
SimpleThreadPoolTaskExecutor
这个实现实际上是Quartz的SimpleThreadPool的子类,它监听Spring的生命周期回调。当你有一个可能需要Quartz和非Quartz组件共享的线程池时,通常会使用该实现。
ThreadPoolTaskExecutor
该实现只能在Java 5环境中使用,其也是该环境中最常用的实现。它公开了bean属性,用于配置java.util.concurrent.ThreadPoolExecutor并将其包装在TaskExecutor中。如果你需要一些高级的接口,例如ScheduledThreadPoolExecutor,建议使用Concurrent TaskExecutor。
TimerTaskExecutor
该实现使用单个java.util.Timer对象作为其内部异步线程来执行任务。它与SyncTaskExecutor的不同之处在于,该实现对所有提交的任务都在Timer内的单独线程中执行,尽管提交的多个任务的执行是顺序同步的。
小结
如上,Spring框架本身提供了很多TaskExecutor的实现,但是如果不符合你的需要,你可以通过实现TaskExecutor接口来定制自己的执行器。
如何在Spring中使用异步执行
使用TaskExecutor实现异步执行
在Spring中TaskExecutor的实现类是以JavaBeans的方式提供服务的,比如下面这个例子,我们通过xml方式向Spring容器中注入了TaskExecutor的实现者ThreadPoolTaskExecutor的实例。
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!--1. 核心线程个数--> <property name="corePoolSize" value="5" /> <!--2.最大线程个数 --> <property name="maxPoolSize" value="10" /> <!--3.超过核心线程个数的线程空闲多久被回收 --> <property name="keepAliveSeconds" value="60" /> <!--4.缓存队列大小 --> <property name="queueCapacity" value="20" /> <!--5.拒绝策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRuns Policy" /> </property> </bean>
·如上代码我们向Spring容器中注入了一个ThreadPoolTaskExecutor处理器实例,其配置属性与Java并发包中的线程池ThreadPoolExecutor类似。
·其中代码1、2将处理器中核心线程个数设置为5,最大线程个数设置为10。
·代码3设置了线程池中非核心线程空闲60s后会被自动回收。
·代码4设置了线程池阻塞队列的大小为20。
·代码5设置了线程池的拒绝策略,这里设置为CallerRunsPolicy,意为当线程池中的队列满了,并且所有线程都在忙碌的时候,如果此时向处理器提交了新的任务,则新的任务不再是异步执行,而是使用调用线程来执行。
当我们向Spring容器中注入了TaskExecutor的实例后,我们就可以在Spring容器中使用它。
<bean id="asyncExecutorExample" class="com.jiaduo.async.AsyncProgram.AsyncExecutorExample"> <property name="taskExecutor" ref="taskExecutor" /> </bean>
·如上代码通过xml方式向Spring容器注入了AsyncExecutorExample的实例,并且其属性taskExecutor注入了上面创建的名称为taskExecutor的执行器,下面我们看看AsyncExecutorExample的代码。
public class AsyncExecutorExample { private class MessagePrinterTask implements Runnable { private String message; public MessagePrinterTask(String message) { this.message = message; } public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " " + message); } catch (Exception e) { e.printStackTrace(); } } } public TaskExecutor getTaskExecutor() { return taskExecutor; } public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } // 线程池执行器 private TaskExecutor taskExecutor; public void printMessages() { for (int i = 0; i < 6; i++) { taskExecutor.execute(new MessagePrinterTask("Message" + i)); } } }
上述代码的AsyncExecutorExample中有一个类型为TaskExecutor的属性,我们通过setter访问器注入了该属性,其有一个printMessages方法用来触发异步任务执行,这里的异步任务被封装为MessagePrinterTask,其在run方法内先休眠1s模拟任务执行,然后打印输出。
下面我们看看如何把上面的内容组成可执行的程序,首先需要把上面两个xml配置汇总到beans.xml里面,代码如下所示。
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> ... </bean> <bean id="asyncExecutorExample" class="com.jiaduo.async.AsyncProgram.AsyncExecutorExample"> <property name="taskExecutor" ref="taskExecutor" /> </bean> </beans>
然后我们需要编写的测试代码如下所示。
public static void main(String arg[]) throws InterruptedException { // 1.创建容器上下文 ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( new String[] { "beans.xml" }); // 2.获取AsyncExecutorExample实例并调用打印方法 System.out.println(Thread.currentThread().getName() + " begin "); AsyncExecutorExample asyncExecutorExample = applicationContext.getBean(AsyncExecutorExample.class); asyncExecutorExample.printMessages(); System.out.println(Thread.currentThread().getName() + " end "); }
·代码1使用ClassPathXmlApplicationContext创建了一个Spring容器上下文,并且以beans.xml作为容器中bean的元数据。
·代码2从容器上下文中获取AsyncExecutorExample的实例,并且调用了print-Messages方法。由于printMessages方法内的6个任务提交到了执行器线程进行处理,所以main函数所在线程调用printMessages方法后马上返回,然后具体任务是由执行器中的线程执行的。
·运行上面代码,一个可能的输出为:
main begin main end taskExecutor-1 Message0 taskExecutor-3 Message2 taskExecutor-2 Message1 taskExecutor-5 Message4 taskExecutor-4 Message3 taskExecutor-1 Message5
可知具体任务是在执行器线程中执行的,而不是在main函数所在线程中执行的。运行上面的代码后,虽然main函数所在线程会马上结束,并且异步任务也执行完了,但是JVM进程并没有退出,这是因为执行器ThreadPoolTaskExecutor中的线程都是用户线程而不是Deamon线程。而JVM退出的条件是进程中不含有任何用户线程,所以我们要与使用Java并发包中的线程池一样,需要显式关闭线程池。
为此我们在AsyncExecutorExample中添加shutdown方法:
public void shutdown() { if (taskExecutor instanceof ThreadPoolTaskExecutor) { ((ThreadPoolTaskExecutor) taskExecutor).shutdown(); } }
然后在测试类的main函数最后添加如下代码:
// 3.关闭执行器,释放线程 asyncExecutorExample.shutdown();
添加代码后,运行测试代码,输出如下所示。
main begin main end java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at com.jiaduo.async.AsyncProgram.AsyncExecutorExample$MessagePrinterTask.run(AsyncExecutorExample.java:17) ...
如上可知我们的任务都被中断了(因为我们的任务中调用了sleep方法),这是因为默认情况下执行器ThreadPoolTaskExecutor中的变量waitForTasksToComplete OnShutdown为false,意为关闭执行器时不等待正在执行的任务执行完毕就中断执行任务的线程。所以我们需要修改ThreadPoolTaskExecutor注入的配置,代码如下所示。
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> ... <property name="waitForTasksToCompleteOnShutdown" value="true"></property> </bean>
如上配置在注入ThreadPoolTaskExecutor的配置属性最后添加了变量waitForTasksTo CompleteOnShutdown为true的配置,然后运行测试类,就会发现等异步任务执行完毕后,当前jvm进程就不存在了,这说明执行器已经被优雅地退出了。
使用注解@Async实现异步执行
在Spring中可以在方法上添加@Async注释,以便异步执行该方法。换句话说,调用线程将在调用含有@Async注释的方法时立即返回,并且该方法的实际执行将发生在Spring的TaskExecutor异步处理器线程中。需要注意的是,该注解@Async默认是不会解析的,你可以使用如下两种方式开启该注解的解析。
·基于xml配置Bean时需要加入如下配置,才可以开启异步处理:
<task:annotation-driven />
·在基于注解的情况下可以添加如下注解来启动异步处理:
@EnableAsync
下面我们看看如何使用第一种方式开启并使用异步执行,首先我们需要在beans-annotation.xml中配置如下代码。
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!--1.开启Async注解的解析 --> <task:annotation-driven /> <!--2.注入业务Bean --> <bean id="asyncCommentExample" class="com.jiaduo.async.AsyncProgram.AsyncAnnotationExample"> </bean> </beans>
如上代码1通过配置开启了对注解Async的解析,代码2注入了我们的业务Bean,其代码如下所示。
public class AsyncAnnotationExample { @Async public void printMessages() { for (int i = 0; i < 6; i++) { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 、 Message" + i); } catch (Exception e) { e.printStackTrace(); } } } }
如上代码的printMessages方法添加了@Async注解,方法内循环6次,循环中先让执行线程休眠1s,然后打印输出。
下面我们组合上面的代码片段形成一个可执行程序进行测试,测试代码如下所示。
public static void main(String arg[]) throws InterruptedException { // 1.创建容器上下文 ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( new String[] { "beans-annotation.xml" }); // 2. 获取AsyncAnnotationExample实例并调用打印方法 System.out.println(Thread.currentThread().getName() + " begin "); AsyncAnnotationExample asyncCommentExample = applicationContext.getBean(AsyncAnnotationExample.class); asyncCommentExample.printMessages(); System.out.println(Thread.currentThread().getName() + " end "); }
如上代码1使用beans-annotation.xml作为容器Bean的元数据创建了Spring上下文,代码2从中获取了AsyncAnnotationExample的实例,然后调用其printMessages,main线程调用该方法后,该方法会马上返回,printMessages内的任务是使用Spring框架内的默认执行器SimpleAsyncTaskExecutor中的线程来执行的。运行上面代码的一个可能的输出结果如下所示。
main begin main end SimpleAsyncTaskExecutor-1 Message0 SimpleAsyncTaskExecutor-1 Message1 SimpleAsyncTaskExecutor-1 Message2 SimpleAsyncTaskExecutor-1 Message3 SimpleAsyncTaskExecutor-1 Message4 SimpleAsyncTaskExecutor-1 Message5
可知具体执行异步任务的是SimpleAsyncTaskExecutor中的线程,而不是main函数所在线程。当然我们可以指定自己的执行器来执行我们的异步任务,这需要我们在xml配置自己的执行器,代码如下所示。
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" ... <!--0.创建自己的业务线程池处理器 --> <task:executor id="myexecutor" pool-size="5" /> <!--1.开启Async注解的解析 --> <task:annotation-driven executor="myexecutor"/> <!--2.注入业务Bean --> <bean id="asyncCommentExample" class="com.jiaduo.async.AsyncProgram.AsyncAnnotationExample"> </bean> </beans>
如上代码0为我们创建了自己的线程池处理器,代码1则把我们的线程池处理器作为异步任务的处理器,运行如上代码,可以看到一个可能的输出结果如下:
main begin main end myexecutor-1 Message0 myexecutor-1 Message1 myexecutor-1 Message2 myexecutor-1 Message3 myexecutor-1 Message4 myexecutor-1 Message5
由如上代码可知,异步任务是使用我们自己的线程池执行器执行的。
下面我们看看第二种方式是如何使用注解方式开启异步处理的,首先我们需要在xml里面进行如下配置。
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!--1.扫描bean的包路径 --> <context:component-scan base-package="com.jiaduo.async.AsyncProgram" /> </beans
如上代码1配置了包扫描路径,框架会扫描该包下面含有@Component注解的从Bean到Spring的容器。
然后要在AsyncAnnotationExample类中加上如下注解。
@EnableAsync//开启异步执行 @Component//把该Bean注入Spring容器 public class AsyncAnnotationExample { @Async public void printMessages() { ... } }
如上代码使用了注解@EnableAsync开启异步执行。
另外需要注意的是@Async注解本身也是有参数的,比如我们可以在某一个需要异步处理的方法上加@Async,注解时指定使用哪一个线程池处理器来进行异步处理。
@Async("bizExecutor") void doSomething(String s) { .... }
如上代码指定了方法doSomething使用名称为bizExecutor的线程池处理器来执行异步任务。
上面我们讲解的异步任务都是没有返回结果的,其实基于@Async注解的异步处理也是支持返回值的,但是返回值类型必须是Future或者其子类类型的,比如返回的Future类型可以是普通的java.util.concurrent.Future类型,也可以是Spring框架的org.springframework.util.concurrent.ListenableFuture类型,或者JDK8中的java.util.concurrent.CompletableFuture类型,又或者Spring中的AsyncResult类型等。这提供了异步执行的好处,以便调用者可以在调用Future上的get()之前处理其他任务。
如下代码展示了在AsyncAnnotationExample中,方法doSomething是如何在具有返回值的方法上使用注解@Async的。
@Async public CompletableFuture<String> doSomething() { // 1.创建future CompletableFuture<String> result = new CompletableFuture<String>(); // 2.模拟任务执行 try { Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + "doSomething"); } catch (Exception e) { e.printStackTrace(); } result.complete("done"); // 3.返回结果 return result; }
代码1创建了一个CompletableFuture类型的Future实例,代码2休眠5s模拟任务执行,然后设置Future的执行结果,代码3则返回Future对象。
下面修改我们的测试代码对其进行测试,代码如下所示。
public static void main(String arg[]) throws InterruptedException { // 1.创建容器上下文 ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( new String[] { "beans-annotation.xml" }); // 2. 获取AsyncExecutorExample实例并调用打印方法 System.out.println(Thread.currentThread().getName() + " begin "); AsyncAnnotationExample asyncCommentExample = applicationContext.getBean(AsyncAnnotationExample.class); // 3.获取异步future并设置回调 CompletableFuture<String> resultFuture = asyncCommentExample.doSomething(); resultFuture.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable u) { if (null == u) { System.out.println(Thread.currentThread().getName() + " " + t); } else { System.out.println("error:" + u.getLocalizedMessage()); } } }); System.out.println(Thread.currentThread().getName() + " end "); }
代码3的main函数所在线程调用了AsyncAnnotationExample的doSomething方法,该方法会马上返回一个CompletableFuture,我们在其上设置了回调函数,之后main线程就退出了,最终doSomething方法内的代码就是使用处理器线程池中的线程来执行的,并当执行完毕后回调我们设置的回调函数。
运行上面代码的输出如下所示。
main begin main end SimpleAsyncTaskExecutor-1doSomething SimpleAsyncTaskExecutor-1 done
如上代码可知,doSomething方法的执行是使用SimpleAsyncTaskExecutor线程池处理器来执行的,而不是main函数所在线程进行执行。
最后看看使用@Async注解遇到异常时该如何处理。当@Async方法具有Future类型返回值时,很容易管理在方法执行期间抛出的异常,因为会在调用get方法等待结果时抛出该异常。但是对于void返回类型来说,异常未被捕获且无法传输。这时候可以提供AsyncUncaughtExceptionHandler来处理该类异常。以下示例显示了如何执行该操作。
public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { // handle exception } }
然后我们在xml里面配置即可:
<task:annotation-driven exception-handler="myAsyncUncaughtExceptionHandler" /> <bean id="myAsyncUncaughtExceptionHandler" class="com.artisan.async.AsyncProgram.MyAsyncUncaughtExceptionHandler"></bean>
如上代码的xml配置首先创建了实例myAsyncUncaughtExceptionHandler,然后将其设置到注解annotation-driven中,在异步任务中抛出异常时会在MyAsyncUncaught ExceptionHandler的handleUncaughtException方法中得到处理。
由上可知基于@Async注解实现异步执行的方式时,大大简化了我们异步编程的运算负担,我们不必再显式地创建线程池并把任务手动提交到线程池内,只要直接在需要异步执行的方法上添加@Async注解即可。当然,当我们需要使用自己的线程池来异步执行标注@Async的方法时,还是需要显式创建线程池的,但这时并不需要显式提交任务到线程池。