概述
在Spring中调用线程将在调用含有@Async注释的方法时立即返回,Spring是如何做到的呢?其实是其对标注@Async注解的类做了代理,比如下面的类Async-AnnotationExample。
public class AsyncAnnotationExample { @Async public CompletableFuture<String> doSomething() { // 1.创建future CompletableFuture<String> result = new CompletableFuture<String>(); // 2.模拟任务执行 try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "doSomething"); } catch (Exception e) { e.printStackTrace(); } result.complete("done"); // 3.返回结果 return result; } }
由于AsyncAnnotationExample类中方法doSomething被标注了@Async注解,所以Spring框架在开启异步处理后会对AsyncAnnotationExample的实例进行代理,代理后的类代码框架如下所示。
public class AsyncAnnotationExampleProxy { public AsyncAnnotationExample getAsyncTask() { return asyncTask; } public void setAsyncAnnotationExample(AsyncAnnotationExample asyncTask) { this.asyncTask = asyncTask; } private AsyncAnnotationExample asyncTask; private TaskExecutor executor = new SimpleAsyncTaskExecutor(); public CompletableFuture<String> dosomthingAsyncFuture() { return CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return asyncTask.dosomthing().get(); } catch (Throwable e) { throw new CompletionException(e); } } },executor); } }
如上代码所示,Spring会对AsyncAnnotationExample类进行代理,并且会把AsyncAnnotationExample的实例注入AsyncAnnotationExampleProxy内部,当我们调用AsyncAnnotationExample的dosomthing方法时,实际调用的是AsyncAnnotation ExampleProxy的dosomthing方法,后者使用CompletableFuture.supplyAsync开启了一个异步任务(其马上返回一个CompletableFuture对象),并且使用默认的SimpleAsync TaskExecutor线程池作为异步处理线程,然后在异步任务内具体调用了AsyncAnnotationExample实例的dosomthing方法。
默认情况下,Spring框架是使用Cglib对标注@Async注解的方法进行代理的,具体拦截器是AnnotationAsyncExecutionInterceptor,我们看看其invoke方法。
public Object invoke(final MethodInvocation invocation) throws Throwable { //1.被代理的目标对象 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); //2. 获取被代理的方法 Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); //3. 判断使用哪个执行器执行被代理的方法 AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } //4. 使用Callable包装要执行的方法 Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; //5. 提交包装的Callable任务到指定执行器执行 return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
·代码1获取被代理的目标对象的Class对象,本例中为class:com.artisan.async.AsyncProgram.AsyncAnnotationExample的Class对象;
·代码2获取被代理的方法,本例中为public java.util.concurrent.CompletableFuture:com.artisan.async.AsyncProgram.AsyncAnnotationExample.dosomthing();
·代码3根据规则获取使用哪个执行器TaskExecutor执行被代理的方法,其代码如下所示。
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16); protected AsyncTaskExecutor determineAsyncExecutor(Method method) { //4.1获取对应方法的执行器 AsyncTaskExecutor executor = this.executors.get(method); //4.2不存在则按照规则查找 if (executor == null) { //4.2.1 如果注解@Async中指定了执行器名称 Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } //4.2.2 获取默认执行器 else { targetExecutor = this.defaultExecutor; if (targetExecutor == null) { synchronized (this.executors) { if (this.defaultExecutor == null) { this.defaultExecutor = getDefaultExecutor(this.beanFactory); } targetExecutor = this.defaultExecutor; } } } //4.2.3 if (targetExecutor == null) { return null; } //4.2.4 添加执行器到缓存 executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } //4.3返回查找的执行器 return executor; }
代码4.1从缓存executors中尝试获取method方法对应的执行器,如果存在则直接执行代码4.3返回;否则执行代码4.2.1判断方法的注解@Async中是否指定了执行器名称,如果有则尝试从Spring的bean工厂内获取该名称的执行器的实例,否则执行代码4.2.2获取默认的执行器(SimpleAsyncTaskExecutor),然后代码4.2.4把执行器放入缓存。
到这里就探讨完成了AnnotationAsyncExecutionInterceptor的invoke方法内代码3是如何确定那个执行器,然后在invoke方法中的代码4使用Callable包装要执行的方法,代码5提交包装的Callable任务到指定执行器。
到这里所有的执行使用的都是调用线程,调用线程提交异步任务到执行器后就返回了,异步任务真正执行的是具体执行器中的线程。下面我们看看代码5 doSubmit的代码。
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { //5.1判断方法返回值是否为CompletableFuture类型或者是其子类 if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } //5.2判断返回值类型是否为ListenableFuture类型或者是其子类 else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } //5.3判断返回值类型是否为ListenableFuture类型或者是其子类 else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } //5.4其他情况下没有返回值 else { executor.submit(task); return null; } }
·代码5.1判断方法返回值是否为CompletableFuture类型或者是其子类,如果是则把任务使用CompletableFuture.supplyAsync方法提交到线程池executor执行,该方法会马上返回一个CompletableFuture对象。
·代码5.2判断方法返回值是否为ListenableFuture类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个ListenableFuture对象。
·代码5.3判断方法返回值是否为Future类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个Future对象。
·代码5.4说明方法不需要返回值,直接提交任务到线程池executor后返回null。
上面我们讲解了代理拦截器AnnotationAsyncExecutionInterceptor的invoke方法如何对标注@Async的方法进行处理,实现异步执行的。其实还有一部分还没讲,前面说了要开始异步处理,必须使用@EnableAsync注解或者task:annotation-driven/来开启异步处理,那么这两个部分背后到底做了什么呢?下面我们就来一探究竟。
首先我们看看添加@EnableAsync注解后发生了什么?在Spring容器启动的过程中会有一系列扩展接口对Bean的元数据定义、初始化、实例化做拦截处理,也存在一些处理器类可以动态地向Spring容器添加一些框架需要使用的Bean实例。其中ConfigurationClassPostProcessor处理器类则是用来解析注解类,并把其注册到Spring容器中的,其可以解析标注@Configuration、@Component、@ComponentScan、@Import、@ImportResource等的Bean。当我们使用context:annotation-config/或者context:component-scan/时,Spring容器会默认把ConfigurationClassPostProcessor处理器注入Spring容器。
而@EnableAsync的定义如下:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { ... }
所以我们添加了@EnableAsync注解后,ConfigurationClassPostProcessor会解析其中的@Import(AsyncConfigurationSelector.class),并把AsyncConfigurationSelector的实例注入Spring容器,其代码如下所示。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } }
AsyncConfigurationSelector实现了ImportSelector接口的selectImports方法,根据AdviceMode参数返回需要导入到Spring容器的Bean的全路径包名。该方法会在ConfigurationClassPostProcessor中的ConfigurationClassParser类中调用。默认情况下的adviceMode为PROXY,所以会把ProxyAsyncConfiguration的实例注入Spring容器。
ProxyAsyncConfiguration的代码如下所示。
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTarget Class")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
如上代码ProxyAsyncConfiguration的asyncAdvisor方法添加了@Bean注解,所以该方法返回的Bean也会被注入Spring容器。该方法创建了AsyncAnnotationBean PostProcessor处理器,所以AsyncAnnotationBeanPostProcessor的一个实例会被注入Spring容器中,由于其实现了BeanFactoryAware接口,所以Spring框架会调用其setBeanFactory(BeanFactory beanFactory)方法把Spring BeanFactory(存放bean的容器)注入该Bean,setBeanFactory方法代码如下所示。
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
如上代码创建了一个AsyncAnnotationAdvisor的实例并保存到了AsyncAnnotation BeanPostProcessor的advisor变量。Spring中每个AsyncAnnotationAdvisor都包含一个Advice(切面逻辑)和一个PointCut(切点),也就是会对符合PointCut的方法使用Advice进行功能增强,对应Advice和PointCut是在AsyncAnnotationAdvisor构造函数内创建的。
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { //6.1.异步注解类型 Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { } //6.2创建切面逻辑 this.advice = buildAdvice(executor, exceptionHandler); //6.3创建切点 this.pointcut = buildPointcut(asyncAnnotationTypes); }
如上代码6.1收集注解@Async和@javax.ejb.Asynchronous到asyncAnnotationTypes,代码6.2则创建Advice,其代码如下所示。
protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
由上述代码可知,这里创建了AnnotationAsyncExecutionInterceptor拦截器作为切面逻辑。下面看看代码6.3如何创建切点。
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
在上述代码中使用收集的注解集合asyncAnnotationTypes,并在每个注解处创建了一个AnnotationMatchingPointcut作为切点,AnnotationMatchingPointcut内部的AnnotationClassFilter的方法matches则作为某个方法是否满足切点的条件,具体代码如下所示。
public boolean matches(Class<?> clazz) { return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(clazz, this.annotationType) : clazz.isAnnotationPresent(this.annotationType)); }
由如上代码可知,判断方法通过是否有注解@Async为依据来判断方法是否符合切点。
到此我们把AsyncAnnotationBeanPostProcessor的setBeanFactory(BeanFactory bean-Factory)方法逻辑讲解完毕了,其内部保存了一个AsyncAnnotationAdvisor对象用来对Spring容器中符合条件(这里为含有@Async注解的方法的Bean)的Bean的方法进行功能增强,下面我们看看AsyncAnnotationBeanPostProcessor的postProcess AfterInitialization方法是如何对这些符合条件的Bean进行代理的。
public Object postProcessAfterInitialization(Object bean, String beanName) { ... if (isEligible(bean, beanName)) { //7.1 ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } //7.2 设置拦截器 proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); //7.3 获取代理类 return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }
如上代码7.1使用prepareProxyFactory创建了代理工厂,其代码如下所示。
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) { ProxyFactory proxyFactory = new ProxyFactory(); proxyFactory.copyFrom(this); proxyFactory.setTarget(bean); return proxyFactory; }
代码7.2则设置在其setBeanFactory方法内创建的AsyncAnnotationAdvisor对象作为Advisor,代码7.3从代理工厂获取代理后的Bean实例并返回到Spring容器,所以当我们调用含有@Async注解的Bean的方法时候,实际调用的是被代理后的Bean。
当我们调用被代理的类的方法时,代理类内部会先使用AsyncAnnotationAdvisor中的PointCut进行比较,看其是否符合切点条件(方法是否含有@Async)注解,如果不符合则直接调用被代理的对象的原生方法,否则调用AsyncAnnotationAdvisor内部的AnnotationAsyncExecutionInterceptor进行拦截异步处理。
在了解添加@EnableAsync注解后会发生什么后,下面我们来看看当添加标签<task:annotation-driven/>开启异步处理时,背后又发生了什么?在Spring中对于标签<XXX:/>总是会存在名称为XXXTaskNamespaceHandler的处理器负责该标签的解析,所以对于标签,自然存在TaskNamespaceHandler处理器负责其解析,其代码如下所示。
public class TaskNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser()); this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser()); this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser()); } }
由如上代码可知,<task:annotation-driven/>
是使用AnnotationDrivenBeanDefinitionParser来进行解析的,下面我们看看其parse方法。
public class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser { ... @Override @Nullable public BeanDefinition parse(Element element, ParserContext parserContext) { Object source = parserContext.extractSource(element); ... //8.1 String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { // mode="aspectj" registerAsyncExecutionAspect(element, parserContext); } else { //8.2 mode="proxy" if (registry.containsBeanDefinition(TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)) { parserContext.getReaderContext().error( "Only one AsyncAnnotationBeanPostProcessor may exist within the context.", source); } else { BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition( "org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor"); builder.getRawBeanDefinition().setSource(source); String executor = element.getAttribute("executor"); if (StringUtils.hasText(executor)) { builder.addPropertyReference("executor", executor); } String exceptionHandler = element.getAttribute("exception-handler"); if (StringUtils.hasText(exceptionHandler)) { builder.addPropertyReference("exceptionHandler", exceptionHandler); } if (Boolean.valueOf(element.getAttribute(AopNamespaceUtils.PROXY_TARGET_CLASS_ATTRIBUTE))) { builder.addPropertyValue("proxyTargetClass", true); } registerPostProcessor(parserContext, builder, TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME); } } //8.3 Finally register the composite component. parserContext.popAndRegisterContainingComponent(); return null; } }
由如上代码可知,其主要是用来创建AsyncAnnotationBeanPostProcessor在Spring容器中的元数据定义,并注册到Spring容器中,剩下的流程就与基于@EnableAsync注解开启异步处理的流程一样了。
小结
我们梳理如何使用Spring框架中的@Async进行异步处理,以及其内部如何使用代理的方式来实现,并且可知使用@Async实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了的负担