基于 Spring Framework v5.2.6.RELEASE
概述
Spring 终有一种非常简便的方法使 Bean 中的一个方法变成异步执行的方法,那就是在方法上标记 @Async 注解,想要开启这一特性,需要在一个配置类上标记 @EnableAsync 注解。
本文将通过源码分析 @EnableAsync 注解是如何开启这一特性的。
@EnableAsync 分析
@EnableAsync 注解的源码如下。
ElementType.TYPE) (RetentionPolicy.RUNTIME) (AsyncConfigurationSelector.class) (public@interfaceEnableAsync { Class<?extendsAnnotation>annotation() defaultAnnotation.class; booleanproxyTargetClass() defaultfalse; AdviceModemode() defaultAdviceMode.PROXY; intorder() defaultOrdered.LOWEST_PRECEDENCE; }
注解的每一个属性都指定了默认值,后续的分析也会基于默认的属性值进行分析。除此之外,注解上的 @Import 元注解引入了 AsyncConfigurationSelector 类。
从它的类关系中可以看出,AsyncConfigurationSelector 实现了 ImportSelector 接口,因此,当 Spring 扫描到配置类后,会执行它的 selectImports 方法,获取一个包含配置类名称的数组,用于加载对应的配置。
AsyncConfigurationSelector 虽然也包含了selectImports
方法,但是从参数类型中可以看出它不是接口中的selectImports
方法的实现方法,要找到接口中的实现方法,我们需要去 AsyncConfigurationSelector 的父类 AdviceModeImportSelector 中。
publicfinalString[] selectImports(AnnotationMetadataimportingClassMetadata) { Class<?>annType=GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType!=null, "Unresolvable type argument for AdviceModeImportSelector"); AnnotationAttributesattributes=AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes==null) { thrownewIllegalArgumentException(String.format( "@%s is not present on importing class '%s' as expected", annType.getSimpleName(), importingClassMetadata.getClassName())); } AdviceModeadviceMode=attributes.getEnum(getAdviceModeAttributeName()); String[] imports=selectImports(adviceMode); if (imports==null) { thrownewIllegalArgumentException("Unknown AdviceMode: "+adviceMode); } returnimports; }
这个方法中,主要是从 @EnableAsync 注解获取各项属性的值,然后使用adviceMode
属性,调用另一个selectImports
方法获取最终的结果。
此处被调用的selectImports
方法,就是 AsyncConfigurationSelector 中的 selectImports
方法。
publicString[] selectImports(AdviceModeadviceMode) { switch (adviceMode) { casePROXY: returnnewString[] {ProxyAsyncConfiguration.class.getName()}; caseASPECTJ: returnnewString[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: returnnull; } }
在 @EnableAsync 注解中,mode的默认值是AdviceMode.PROXY
,因此,这里引入的配置类是 ProxyAsyncConfiguration。
接下来分析 ProxyAsyncConfiguration 类。
ProxyAsyncConfiguration 分析
BeanDefinition.ROLE_INFRASTRUCTURE) (publicclassProxyAsyncConfigurationextendsAbstractAsyncConfiguration { name=TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) (BeanDefinition.ROLE_INFRASTRUCTURE) (publicAsyncAnnotationBeanPostProcessorasyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessorbpp=newAsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<?extendsAnnotation>customAsyncAnnotation=this.enableAsync.getClass("annotation"); if (customAsyncAnnotation!=AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); returnbpp; } }
在 ProxyAsyncConfiguration 中,只有一个 Bean 配置,类型是 AsyncAnnotationBeanPostProcessor,由此可以知道,@EnableAsync 所开启的功能,是通过 Bean 的后处理器来实现的。
上述的方法体中,通过构造方法创建了 AsyncAnnotationBeanPostProcessor 对象。
publicAsyncAnnotationBeanPostProcessor() { setBeforeExistingAdvisors(true); }
构造方法中设置了一个属性值,这个属性是是beforeExistingAdvisors
,定义在父类 AbstractAdvisingBeanPostProcessor 中,这个属性的默认值是false,当它的值为true时,会将新的增强逻辑添加到增强逻辑列表的开头而不是最后。
也就是说,@EnableAsync 提供的异步执行特性,是基于 AOP 特性来实现的。
接着往下看,在创建了 AsyncAnnotationBeanPostProcessor 对象之后,为其配置了一些属性,有一些属性的值是从 @EnableAsync 属性值获取的,还有两个属性值需要留意,就是this.executor
和this.exceptionHandler
,这两个成员变量的值是从哪儿来的呢?
我们可以找到 ProxyAsyncConfiguration 的父类 AbstractAsyncConfiguration,其中有一个标记了 @Autowired 注解的方法。
// org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurersrequired=false) (voidsetConfigurers(Collection<AsyncConfigurer>configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() >1) { thrownewIllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurerconfigurer=configurers.iterator().next(); this.executor=configurer::getAsyncExecutor; this.exceptionHandler=configurer::getAsyncUncaughtExceptionHandler; }
如果我们自己配置了线程池和异常处理器,则会在这里执行配置,这样,我们配置的线程池和异常处理器就会被添加到 AsyncAnnotationBeanPostProcessor 中。
接下来,我们再分析 AsyncAnnotationBeanPostProcessor 后处理器是如何工作的。
AsyncAnnotationBeanPostProcessor 分析
从它的类继承关系中可以看出,它是一个基于 AOP 特性来为 Bean 中的方法提供异步执行功能的 Bean 后处理器。
AsyncAnnotationBeanPostProcessor 同时实现了 BeanFactoryAware 接口,在它的setBeanFactory
方法中,完成了 Advisor 的创建。
publicvoidsetBeanFactory(BeanFactorybeanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisoradvisor=newAsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType!=null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor=advisor; }
这里创建的 Advisor 类型是 AsyncAnnotationAdvisor,创建完之后,它被复制给了advisor
成员变量,这个成员变量定义在 AsyncAnnotationBeanPostProcessor 的父类 AbstractBeanFactoryAwareAdvisingPostProcessor 中。这个advisor
成员变量就是处理增强逻辑的对象。
AsyncAnnotationAdvisor 分析
关于 Spring 是如何在后处理器中为 Bean 创建代理对象以及如何向代理对象中加入增强逻辑的,我之前的文章有很详细的分析,可以阅读之前关于 AOP 原理的分析文章来了解。下面我们直接分析 AsyncAnnotationAdvisor,它是完成方法异步执行的核心。
一个 Advisor 通常有两个非常重要的部分,一个是 Pointcut,用于匹配需要增强的方法,另一个是 Advice 也就是具体的增强逻辑。对于 AsyncAnnotationAdvisor 来说,这两个部分都是在它的构造方法中构建的。
publicAsyncAnnotationAdvisor( Supplier<Executor>executor, Supplier<AsyncUncaughtExceptionHandler>exceptionHandler) { Set<Class<?extendsAnnotation>>asyncAnnotationTypes=newLinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<?extendsAnnotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundExceptionex) { // If EJB 3.1 API not present, simply ignore. } this.advice=buildAdvice(executor, exceptionHandler); this.pointcut=buildPointcut(asyncAnnotationTypes); }
其中可以看到两行关键的代码,他们分别完成了advice
和pointcut
成员变量的构建。
this.advice=buildAdvice(executor, exceptionHandler); this.pointcut=buildPointcut(asyncAnnotationTypes);
下面分别来看这两部分。
Advice 构建
先看buildAdvice
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildAdvice protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
Advice 的构建比较简单,这里可以看到,最终构建的 Advice 是一个 AnnotationAsyncExecutionInterceptor 类型的拦截器,除了调用构造方法创建之外,还配置了executor
和exceptionHandler
,这个拦截器应该就是完成 AOP 增强逻辑的拦截器,我们放到后文中分析。
Pointcut 构建
下面再看buildPointcut
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildPointcutprotectedPointcutbuildPointcut(Set<Class<?extendsAnnotation>>asyncAnnotationTypes) { ComposablePointcutresult=null; for (Class<?extendsAnnotation>asyncAnnotationType : asyncAnnotationTypes) { Pointcutcpc=newAnnotationMatchingPointcut(asyncAnnotationType, true); Pointcutmpc=newAnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result==null) { result=newComposablePointcut(cpc); } else { result.union(cpc); } result=result.union(mpc); } return (result!=null?result : Pointcut.TRUE); }
这个方法的逻辑比较简单,首先创建了两个 Pointcut 对象,cpc
用于匹配类型,mpc
用于匹配方法,他们的逻辑都很简单,就是看类或者方法的定义是否包含 @Async 注解。
最后再将两者合并为一个 ComposablePointcut 对象返回,ComposablePointcut 的作用就是将多个 Pointcut 对象合并成一个。
AnnotationAsyncExecutionInterceptor 分析
了解完上面的内容,接下来就开始分析 AnnotationAsyncExecutionInterceptor 拦截器。它是一个包含 AOP 增强逻辑的拦截器,也是完成方法异步调用的核心逻辑。
AnnotationAsyncExecutionInterceptor 要完成它的任务,有两个比较核心的功能,一个是目标方法的匹配,另一个就是拦截器的逻辑。目标方法的匹配逻辑,我们在上文中已经介绍过了,以下主要分析其拦截器逻辑,也就是它的invoke
方法。
以上是 AnnotationAsyncExecutionInterceptor 的类关系图,它实现了 MethodInterceptor 接口,invoke
方法的实现在父类 AsyncExecutionInterceptor 中。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokepublicObjectinvoke(finalMethodInvocationinvocation) throwsThrowable { Class<?>targetClass= (invocation.getThis() !=null?AopUtils.getTargetClass(invocation.getThis()) : null); MethodspecificMethod=ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); finalMethoduserDeclaredMethod=BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutorexecutor=determineAsyncExecutor(userDeclaredMethod); if (executor==null) { thrownewIllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object>task= () -> { try { Objectresult=invocation.proceed(); if (resultinstanceofFuture) { return ((Future<?>) result).get(); } } catch (ExecutionExceptionex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwableex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } returnnull; }; returndoSubmit(task, executor, invocation.getMethod().getReturnType()); }
从上面的源码中可以看到三个关键的步骤:
- 找到目标方法,并根据目标方法获取到执行它的 AsyncTaskExecutor。
- 将目标方法的调用,封装到一个 Callable 异步任务
task
当中。 - 通过doSubmit方法来异步调用上一步封装的
task
。
下面我们详细分析这三个步骤。
AsyncTaskExecutor 查找
AsyncTaskExecutor 在determineAsyncExecutor
方法中完成。
protectedAsyncTaskExecutordetermineAsyncExecutor(Methodmethod) { AsyncTaskExecutorexecutor=this.executors.get(method); if (executor==null) { ExecutortargetExecutor; Stringqualifier=getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor=findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor=this.defaultExecutor.get(); } if (targetExecutor==null) { returnnull; } executor= (targetExecutorinstanceofAsyncListenableTaskExecutor? (AsyncListenableTaskExecutor) targetExecutor : newTaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } returnexecutor; }
首先会从executors
中根据方法获取对应的 AsyncTaskExecutor,executors
是一个用来缓存 Executor 的成员变量。
privatefinalMap<Method, AsyncTaskExecutor>executors=newConcurrentHashMap<>(16);
当第一次进入这个方法的时候,executors
肯定是空的,因此会进入if
语句的逻辑获取 Executor 然后再将其添加到executors
中。在if语句中,首先会通过getExecutorQualifier
方法获取一个qualifier
,我们进入方法查看获取的过程。
// org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifierprotectedStringgetExecutorQualifier(Methodmethod) { // Maintainer's note: changes made here should also be made in// AnnotationAsyncExecutionAspect#getExecutorQualifierAsyncasync=AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async==null) { async=AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async!=null?async.value() : null); }
这个方法会从目标方法或者其所在的类型上的 @Async 注解的value
属性,作为方法的返回值复制给qualifier
。这个qualifier
的值是一个 Executor 的 Bean 名称,也就是说,我们可以通过 @Async 的value
属性指定执行异步任务的 Executor 的 Bean 名称。
如果qualifier
不是空的,那么,就会通过findQualifiedExecutor方法从 Spring 容器中获取对应的 Executor 实例。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#findQualifiedExecutorprotectedExecutorfindQualifiedExecutor(BeanFactorybeanFactory, Stringqualifier) { if (beanFactory==null) { thrownewIllegalStateException("BeanFactory must be set on "+getClass().getSimpleName() +" to access qualified executor '"+qualifier+"'"); } returnBeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier); }
如果qualifier
是空的,那么就会通过this.defaultExecutor.get()
获取默认的 Executor,那么,默认的 Executor 是什么呢?我们需要在去 AsyncAnnotationAdvisor 的buildAdvice
方法中,回顾一下 AnnotationAsyncExecutionInterceptor 创建的过程。
AnnotationAsyncExecutionInterceptorinterceptor=newAnnotationAsyncExecutionInterceptor(null);
以上是 AnnotationAsyncExecutionInterceptor 创建的语句,从这里找到对应的构造方法。
publicAnnotationAsyncExecutionInterceptor(ExecutordefaultExecutor) { super(defaultExecutor); }
构造方法需要提供一个默认的 Executor,也就是defaultExecutor
参数,这里提供了null
,不过我们可以继续查看父类的构造方法。
publicAsyncExecutionAspectSupport(ExecutordefaultExecutor) { this.defaultExecutor=newSingletonSupplier<>(defaultExecutor, () ->getDefaultExecutor(this.beanFactory)); this.exceptionHandler=SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new); }
在被调用的 AsyncExecutionAspectSupport 的构造方法中,通过getDefaultExecutor
方法,提供了默认的 Executor。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutorprotectedExecutorgetDefaultExecutor(BeanFactorybeanFactory) { ExecutordefaultExecutor=super.getDefaultExecutor(beanFactory); return (defaultExecutor!=null?defaultExecutor : newSimpleAsyncTaskExecutor()); }
这里看到,默认的 Executor 是一个 SimpleAsyncTaskExecutor,也就是说,如果我们没有在项目中配置线程池,则默认使用 SimpleAsyncTaskExecutor 来执行异步任务。
Callable 任务封装
得到 Executor 之后,就是任务的封装,这一步很简单,就是将目标方法的调用放到一个 Callable 类型的任务的call
方法中。
doSubmit 异步执行方法
最后一步就是任务的提交,通过doSubmit
方法完成。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmitprotectedObjectdoSubmit(Callable<Object>task, AsyncTaskExecutorexecutor, Class<?>returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { returnCompletableFuture.supplyAsync(() -> { try { returntask.call(); } catch (Throwableex) { thrownewCompletionException(ex); } }, executor); } elseif (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } elseif (Future.class.isAssignableFrom(returnType)) { returnexecutor.submit(task); } else { executor.submit(task); returnnull; } }
其实就是调用了 Executor 的submit
异步执行了任务。
不过这里有一点要说明,虽然在我们没有配置 Excutor 的情况下 ,Spring 会使用默认的 SimpleAsyncTaskExecutor 来执行异步任务,但是 SimpleAsyncTaskExecutor 会为每一个任务创建一个新的线程,而不是使用线程池来完成,很容易导致内存溢出,因此,在实践中最好为异步任务配置合适的线程池。
总结
本文以 @EnableAsync 作为切入点,分析了 Spring 开启基于注解的异步任务特性的原理。