前提背景
开启异步任务使用方法:
- 方法上加@Async注解
- 启动类或者配置类上@EnableAsync
@EnableAsync
源码注释
Enables Spring's asynchronous method execution capability, similar to functionality found in Spring's task:* XML namespace. To be used together with @Configuration classes as follows, enabling annotation-driven async processing for an entire Spring application context:
- @EnableAsync主要的目的就是开启spring异步执行器,有点类似之前xml中的task标签配置,需要联合@Configuration注解一起使用。
使用模式
@Configuration @EnableAsync public class AppConfig {} 复制代码
MyAsyncBean is a user-defined type with one or more methods annotated with either Spring's @Async annotation, the EJB 3.1 @javax.ejb.Asynchronous annotation, or any custom annotation specified via the annotation() attribute. The aspect is added transparently for any registered bean, for instance via this configuration:
- NyAsyncBean 是用户自己定义的一个或者多个通过@Async注解进行修饰的对象、或者EJB 3.1 @javax.ejb.Asynchronous注解,或者用户自定义的指定的注解操作属性,被注册到切面的功能方法。
@Configuration public class AnotherAppConfig { @Bean public MyAsyncBean asyncBean() { return new MyAsyncBean(); } } 复制代码
By default, Spring will be searching for an associated thread pool definition: either a unique TaskExecutor bean in the context, or an Executor bean named "taskExecutor" otherwise. If neither of the two is resolvable, a SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.
- 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使用SimpleAsyncTaskExecutor执行器
To customize all this, implement AsyncConfigurer and provide: your own Executor through the getAsyncExecutor() method, and your own AsyncUncaughtExceptionHandler through the getAsyncUncaughtExceptionHandler() method.
覆盖定制化UncaughtExceptionHandler/getAsyncExecutor
- 可实现AsyncConfigurer接口重写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获取异步未捕获异常处理器
@Configuration @EnableAsync public class AppConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(7); executor.setMaxPoolSize(42); executor.setQueueCapacity(11); executor.setThreadNamePrefix("MyExecutor-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return MyAsyncUncaughtExceptionHandler(); } } 复制代码
If only one item needs to be customized, null can be returned to keep the default settings. Consider also extending from AsyncConfigurerSupport when possible. Note: In the above example the ThreadPoolTaskExecutor is not a fully managed Spring bean. Add the @Bean annotation to the getAsyncExecutor() method if you want a fully managed bean. In such circumstances it is no longer necessary to manually call the executor.initialize() method as this will be invoked automatically when the bean is initialized.
同比相关
<beans> <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/> <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/> <bean id="asyncBean" class="com.foo.MyAsyncBean"/> <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/> </beans> 复制代码
重点说明
The mode() attribute controls how advice is applied: If the mode is AdviceMode.PROXY (the default), then the other attributes control the behavior of the proxying. Please note that proxy mode allows for interception of calls through the proxy only; local calls within the same class cannot get intercepted that way.
- 这里就说明了@Async必须在不同方法中调用。
Note that if the mode() is set to AdviceMode.ASPECTJ, then the value of the proxyTargetClass() attribute will be ignored. Note also that in this case the spring-aspects module JAR must be present on the classpath, with compile-time weaving or load-time weaving applying the aspect to the affected classes. There is no proxy involved in such a scenario; local calls will be intercepted as well.//当然也可以用Aspect模式织入(需要引入spring-aspects模块需要的jar)
功能解析
@Async
- 该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。
- 入参随意,但返回值只能是void或者Future.(ListenableFuture接口/CompletableFuture类)
- Future是代理返回的切实的异步返回,用以追踪异步方法的返回值。当然也可以使用AsyncResult类(实现ListenableFuture接口)(Spring或者EJB都有)或者CompletableFuture类
- 加在类上表示整个类都使用,加在方法上会覆盖类上的设置
- value字段用以限定执行方法的执行器名称(自定义):Executor或者TaskExecutor
@EnableAsync
- 开启spring异步执行器,类似xml中的task标签配置,需要联合@Configuration注解一起使用
- 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使用SimpleAsyncTaskExecutor执行器
- 针对于上面章节的【覆盖定制化UncaughtExceptionHandler/getAsyncExecutor】可实现AsyncConfigurer接口复写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获取异步未捕获异常处理器
- @Configuration
- 注解类和xml基本一致,但是使用注解类还可以自定义线程名前缀(上面的AppConfig-》getAsyncExecutor-》setThreadNamePrefix)
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { /**该属性用来支持用户自定义异步注解,默认扫描spring的@Async和EJB3.1的@code @javax.ejb.Asynchronous * Indicate the 'async' annotation type to be detected at either class * or method level. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */ Class<? extends Annotation> annotation() default Annotation.class; /**标明是否需要创建CGLIB子类代理,AdviceMode=PROXY时才适用。注意设置为true时,其它spring管理的bean也会升级到CGLIB子类代理 * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>. * <p>The default is {@code false}. * <p>Note that setting this attribute to {@code true} will affect <em>all</em> * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another — for example, in tests. */ boolean proxyTargetClass() default false; /**标明异步通知将会如何实现,默认PROXY,如需支持同一个类中非异步方法调用另一个异步方法,需要设置为ASPECTJ * Indicate how async advice should be applied. * <p><b>The default is {@link AdviceMode#PROXY}.</b> * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. */ AdviceMode mode() default AdviceMode.PROXY; /**标明异步注解bean处理器应该遵循的执行顺序,默认最低的优先级(Integer.MAX_VALUE,值越小优先级越高) * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. */ int order() default Ordered.LOWEST_PRECEDENCE; } 复制代码
执行流程
核心注解就是@Import(AsyncConfigurationSelector.class),就是属于ImportSelector接口的selectImports()方法,源码如下:
查询器:基于@EnableAsync中定义的模式AdviceMode加在@Configuration标记的类上,确定抽象异步配置类的实现类
/** * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class. * * @author Chris Beams * @since 3.1 * @see EnableAsync * @see ProxyAsyncConfiguration */ public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * {@inheritDoc} * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively */ @Override public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY://如果配置的PROXY,使用ProxyAsyncConfiguration return new String[] { ProxyAsyncConfiguration.class.getName() }; case ASPECTJ://如果配置的ASPECTJ,使用ProxyAsyncConfiguration return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME }; default: return null; } } } 复制代码
JDK接口代理-选一个类ProxyAsyncConfiguration:
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } if (this.executor != null) { bpp.setExecutor(this.executor); } if (this.exceptionHandler != null) { bpp.setExceptionHandler(this.exceptionHandler); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } } 复制代码
- 如果@EnableAsync中用户自定义了annotation属性,即异步注解类型
- 新建一个异步注解bean后处理器
- 设置Executor:设置线程任务执行器
- AsyncUncaughtExceptionHandler:设置异常处理器
- 设置是否升级到CGLIB子类代理,默认不开启
- 设置执行优先级,默认最后执行
ProxyAsyncConfiguration就两点:
- 就是继承了AbstractAsyncConfiguration类
- 定义了一个bean:AsyncAnnotationBeanPostProcessor
AbstractAsyncConfiguration
/** * Abstract base {@code Configuration} class providing common structure for enabling * Spring's asynchronous method execution capability. * 抽象异步配置类,封装了通用结构,用以支持spring的异步方法执行能力 * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync */ @Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { protected AnnotationAttributes enableAsync;//enableAsync的注解属性 protected Executor executor;//Doug Lea老李头设计的线程任务执行器 protected AsyncUncaughtExceptionHandler exceptionHandler;//异常处理器 @Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } /** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer.getAsyncExecutor(); this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler(); } } 复制代码
主要设置为一下三点
- 1)注解属性
- 2)异步任务执行器
- 3)异常处理器
方法:
- 1)setImportMetadata 设置注解属性
- 2)setConfigurers 设置异步任务执行器和异常处理器
AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor这个类的Bean 初始化时 : BeanFactoryAware接口setBeanFactory方法中,对AsyncAnnotationAdvisor异步注解切面进行了构造。
@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; } 复制代码
AsyncAnnotationAdvisor:
通过AsyncAnnotationAdvisor建立异步化的AOP切面拦截操作处理机制
- AsyncAnnotationBeanPostProcessor -> postProcessAfterInitialization()
- 具体的后置处理:AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,
- 该类实现了BeanPostProcessor接口,复写postProcessAfterInitialization方法
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } //把Advisor添加进bean ProxyFactory-》AdvisedSupport-》Advised if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } //构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No async proxy needed. return bean; } 复制代码
- isEligible用于判断这个类或者这个类中的某个方法是否含有注解,AsyncAnnotationAdvisor 实现了PointcutAdvisor接口。
- 代理AopProxy接口,我们这里最终实际生成的是JdkDynamicAopProxy
// 得到方法的拦截器链 List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // 构造 invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. retVal = invocation.proceed(); } 复制代码
- @Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。
- chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,然后执行proceed方法。
@Override public Object proceed() throws Throwable { // 如果没有拦截器,直接执行被代理的方法 if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); } Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); } } 复制代码
核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke。
public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException(//如果没有自定义异步任务执行器,报下面这行错,不用管,可以默认执行 "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get();//阻塞等待执行完毕得到结果 } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; } }; //提交有任务给执行器 return doSubmit(task, executor, invocation.getMethod().getReturnType()); } 复制代码
执行核心方法doSubmit
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (completableFuturePresent) {//先判断是否存在CompletableFuture这个类,优先使用CompletableFuture执行任务 Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor); if (result != null) { return result; } }//返回值是可监听Future,定义过回调函数:addCallback if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); }//返回值是Future else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else {//没有返回值 executor.submit(task); return null; } } 复制代码
总体流程
- 从注解开始:@EnableAsync -> ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)
- 从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())-> AOP-生成代理类AopProxy(postProcessAfterInitialization())-> AOP-切点执行(InvocationHandler.invoke)