【Spring技术实战】@Async机制的使用技巧以及异步注解源码解析

简介: 【Spring技术实战】@Async机制的使用技巧以及异步注解源码解析

前提背景


开启异步任务使用方法:


  1. 方法上加@Async注解
  2. 启动类或者配置类上@EnableAsync



@EnableAsync


源码注释


Enables Spring's asynchronous method execution capability, similar to functionality found in Spring's task:* XML namespace. To be used together with @Configuration classes as follows, enabling annotation-driven async processing for an entire Spring application context:

  • @EnableAsync主要的目的就是开启spring异步执行器,有点类似之前xml中的task标签配置,需要联合@Configuration注解一起使用。




使用模式
@Configuration
 @EnableAsync
 public class AppConfig {}
复制代码


MyAsyncBean is a user-defined type with one or more methods annotated with either Spring's @Async annotation, the EJB 3.1 @javax.ejb.Asynchronous annotation, or any custom annotation specified via the annotation() attribute. The aspect is added transparently for any registered bean, for instance via this configuration:


  • NyAsyncBean 是用户自己定义的一个或者多个通过@Async注解进行修饰的对象、或者EJB 3.1 @javax.ejb.Asynchronous注解,或者用户自定义的指定的注解操作属性,被注册到切面的功能方法。
@Configuration
 public class AnotherAppConfig {
     @Bean
     public MyAsyncBean asyncBean() {
         return new MyAsyncBean();
     }
 }
复制代码



By default, Spring will be searching for an associated thread pool definition: either a unique TaskExecutor bean in the context, or an Executor bean named "taskExecutor" otherwise. If neither of the two is resolvable, a SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.


  • 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使用SimpleAsyncTaskExecutor执行器

To customize all this, implement AsyncConfigurer and provide: your own Executor through the getAsyncExecutor() method, and your own AsyncUncaughtExceptionHandler through the getAsyncUncaughtExceptionHandler() method.




覆盖定制化UncaughtExceptionHandler/getAsyncExecutor


  • 可实现AsyncConfigurer接口重写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获取异步未捕获异常处理器
@Configuration
 @EnableAsync
 public class AppConfig implements AsyncConfigurer {
     @Override
     public Executor getAsyncExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(7);
         executor.setMaxPoolSize(42);
         executor.setQueueCapacity(11);
         executor.setThreadNamePrefix("MyExecutor-");
         executor.initialize();
         return executor;
     }
     @Override
     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return MyAsyncUncaughtExceptionHandler();
     }
 }
复制代码


If only one item needs to be customized, null can be returned to keep the default settings. Consider also extending from AsyncConfigurerSupport when possible. Note: In the above example the ThreadPoolTaskExecutor is not a fully managed Spring bean. Add the @Bean annotation to the getAsyncExecutor() method if you want a fully managed bean. In such circumstances it is no longer necessary to manually call the executor.initialize() method as this will be invoked automatically when the bean is initialized.



同比相关
<beans>
     <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>
     <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
     <bean id="asyncBean" class="com.foo.MyAsyncBean"/>
     <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
 </beans>
复制代码



重点说明


The mode() attribute controls how advice is applied: If the mode is AdviceMode.PROXY (the default), then the other attributes control the behavior of the proxying. Please note that proxy mode allows for interception of calls through the proxy only; local calls within the same class cannot get intercepted that way.


  • 这里就说明了@Async必须在不同方法中调用。

Note that if the mode() is set to AdviceMode.ASPECTJ, then the value of the proxyTargetClass() attribute will be ignored. Note also that in this case the spring-aspects module JAR must be present on the classpath, with compile-time weaving or load-time weaving applying the aspect to the affected classes. There is no proxy involved in such a scenario; local calls will be intercepted as well.//当然也可以用Aspect模式织入(需要引入spring-aspects模块需要的jar)




功能解析


@Async


  • 该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。
  • 入参随意,但返回值只能是void或者Future.(ListenableFuture接口/CompletableFuture类)
  • Future是代理返回的切实的异步返回,用以追踪异步方法的返回值。当然也可以使用AsyncResult类(实现ListenableFuture接口)(Spring或者EJB都有)或者CompletableFuture类
  • 加在类上表示整个类都使用,加在方法上会覆盖类上的设置
  • value字段用以限定执行方法的执行器名称(自定义):Executor或者TaskExecutor



@EnableAsync


  • 开启spring异步执行器,类似xml中的task标签配置,需要联合@Configuration注解一起使用
  • 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使用SimpleAsyncTaskExecutor执行器
  • 针对于上面章节的【覆盖定制化UncaughtExceptionHandler/getAsyncExecutor】可实现AsyncConfigurer接口复写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获取异步未捕获异常处理器
  • @Configuration
  • 注解类和xml基本一致,但是使用注解类还可以自定义线程名前缀(上面的AppConfig-》getAsyncExecutor-》setThreadNamePrefix)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
   /**该属性用来支持用户自定义异步注解,默认扫描spring的@Async和EJB3.1的@code @javax.ejb.Asynchronous
    * Indicate the 'async' annotation type to be detected at either class
    * or method level.
    * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
    * {@code @javax.ejb.Asynchronous} annotation will be detected.
    * <p>This attribute exists so that developers can provide their own
    * custom annotation type to indicate that a method (or all methods of
    * a given class) should be invoked asynchronously.
    */
   Class<? extends Annotation> annotation() default Annotation.class;
   /**标明是否需要创建CGLIB子类代理,AdviceMode=PROXY时才适用。注意设置为true时,其它spring管理的bean也会升级到CGLIB子类代理
    * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
    * to standard Java interface-based proxies.
    * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
    * <p>The default is {@code false}.
    * <p>Note that setting this attribute to {@code true} will affect <em>all</em>
    * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
    * For example, other beans marked with Spring's {@code @Transactional} annotation
    * will be upgraded to subclass proxying at the same time. This approach has no
    * negative impact in practice unless one is explicitly expecting one type of proxy
    * vs. another &mdash; for example, in tests.
    */
   boolean proxyTargetClass() default false;
   /**标明异步通知将会如何实现,默认PROXY,如需支持同一个类中非异步方法调用另一个异步方法,需要设置为ASPECTJ
    * Indicate how async advice should be applied.
    * <p><b>The default is {@link AdviceMode#PROXY}.</b>
    * Please note that proxy mode allows for interception of calls through the proxy
    * only. Local calls within the same class cannot get intercepted that way; an
    * {@link Async} annotation on such a method within a local call will be ignored
    * since Spring's interceptor does not even kick in for such a runtime scenario.
    * For a more advanced mode of interception, consider switching this to
    * {@link AdviceMode#ASPECTJ}.
    */
   AdviceMode mode() default AdviceMode.PROXY;
   /**标明异步注解bean处理器应该遵循的执行顺序,默认最低的优先级(Integer.MAX_VALUE,值越小优先级越高)
    * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
    * should be applied.
    * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
    * after all other post-processors, so that it can add an advisor to
    * existing proxies rather than double-proxy.
    */
   int order() default Ordered.LOWEST_PRECEDENCE;
}
复制代码




执行流程


核心注解就是@Import(AsyncConfigurationSelector.class),就是属于ImportSelector接口的selectImports()方法,源码如下:


查询器:基于@EnableAsync中定义的模式AdviceMode加在@Configuration标记的类上,确定抽象异步配置类的实现类


/**
 * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based
 * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class.
 *
 * @author Chris Beams
 * @since 3.1
 * @see EnableAsync
 * @see ProxyAsyncConfiguration
 */
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    /**
     * {@inheritDoc}
     * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for
     * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively
     */
    @Override
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY://如果配置的PROXY,使用ProxyAsyncConfiguration
                return new String[] { ProxyAsyncConfiguration.class.getName() };
            case ASPECTJ://如果配置的ASPECTJ,使用ProxyAsyncConfiguration
                return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
            default:
                return null;
        }
    }
}
复制代码



JDK接口代理-选一个类ProxyAsyncConfiguration:
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
  if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        if (this.executor != null) {
            bpp.setExecutor(this.executor);
        }
        if (this.exceptionHandler != null) {
            bpp.setExceptionHandler(this.exceptionHandler);
        }
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }
}
复制代码



  • 如果@EnableAsync中用户自定义了annotation属性,即异步注解类型
  • 新建一个异步注解bean后处理器
  • 设置Executor:设置线程任务执行器
  • AsyncUncaughtExceptionHandler:设置异常处理器
  • 设置是否升级到CGLIB子类代理,默认不开启
  • 设置执行优先级,默认最后执行



ProxyAsyncConfiguration就两点:


  1. 就是继承了AbstractAsyncConfiguration
  2. 定义了一个bean:AsyncAnnotationBeanPostProcessor



AbstractAsyncConfiguration
/**
 * Abstract base {@code Configuration} class providing common structure for enabling
 * Spring's asynchronous method execution capability.
 * 抽象异步配置类,封装了通用结构,用以支持spring的异步方法执行能力
 * @author Chris Beams
 * @author Stephane Nicoll
 * @since 3.1
 * @see EnableAsync
 */
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
    protected AnnotationAttributes enableAsync;//enableAsync的注解属性
    protected Executor executor;//Doug Lea老李头设计的线程任务执行器
    protected AsyncUncaughtExceptionHandler exceptionHandler;//异常处理器
    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        this.enableAsync = AnnotationAttributes.fromMap(
                importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
        if (this.enableAsync == null) {
            throw new IllegalArgumentException(
                    "@EnableAsync is not present on importing class " + importMetadata.getClassName());
        }
    }
    /**
     * Collect any {@link AsyncConfigurer} beans through autowiring.
     */
    @Autowired(required = false)
    void setConfigurers(Collection<AsyncConfigurer> configurers) {
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one AsyncConfigurer may exist");
        }
        AsyncConfigurer configurer = configurers.iterator().next();
        this.executor = configurer.getAsyncExecutor();
        this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
    }
}
复制代码



主要设置为一下三点


  • 1)注解属性
  • 2)异步任务执行器
  • 3)异常处理器


方法:


  • 1)setImportMetadata 设置注解属性
  • 2)setConfigurers 设置异步任务执行器和异常处理器



AsyncAnnotationBeanPostProcessor


AsyncAnnotationBeanPostProcessor这个类的Bean 初始化时 : BeanFactoryAware接口setBeanFactory方法中,对AsyncAnnotationAdvisor异步注解切面进行了构造。

@Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);
        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }
复制代码


AsyncAnnotationAdvisor:


通过AsyncAnnotationAdvisor建立异步化的AOP切面拦截操作处理机制

image.png


  • AsyncAnnotationBeanPostProcessor -> postProcessAfterInitialization()
  • 具体的后置处理:AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,
  • 该类实现了BeanPostProcessor接口,复写postProcessAfterInitialization方法
@Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (bean instanceof AopInfrastructureBean) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }
        //把Advisor添加进bean  ProxyFactory-》AdvisedSupport-》Advised
        if (bean instanceof Advised) {
            Advised advised = (Advised) bean;
            if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
                // Add our local Advisor to the existing proxy's Advisor chain...
                if (this.beforeExistingAdvisors) {
                    advised.addAdvisor(0, this.advisor);
                }
                else {
                    advised.addAdvisor(this.advisor);
                }
                return bean;
            }
        }
        //构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy
        if (isEligible(bean, beanName)) {
            ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
            if (!proxyFactory.isProxyTargetClass()) {
                evaluateProxyInterfaces(bean.getClass(), proxyFactory);
            }
            proxyFactory.addAdvisor(this.advisor);
            customizeProxyFactory(proxyFactory);
            return proxyFactory.getProxy(getProxyClassLoader());
        }
        // No async proxy needed.
        return bean;
    }
复制代码



  • isEligible用于判断这个类或者这个类中的某个方法是否含有注解,AsyncAnnotationAdvisor 实现了PointcutAdvisor接口。
  • 代理AopProxy接口,我们这里最终实际生成的是JdkDynamicAopProxy
// 得到方法的拦截器链
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
// Check whether we have any advice. If we don't, we can fallback on direct
// reflective invocation of the target, and avoid creating a MethodInvocation.
if (chain.isEmpty()) {
    // We can skip creating a MethodInvocation: just invoke the target directly
    // Note that the final invoker must be an InvokerInterceptor so we know it does
    // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
    Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
    retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
    // 构造
    invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
    // Proceed to the joinpoint through the interceptor chain.
    retVal = invocation.proceed();
}
复制代码



  • @Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。
  • chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,然后执行proceed方法。
@Override
    public Object proceed() throws Throwable {
        //    如果没有拦截器,直接执行被代理的方法
        if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
            return invokeJoinpoint();
        }
        Object interceptorOrInterceptionAdvice =
         this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
        if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
            // Evaluate dynamic method matcher here: static part will already have
            // been evaluated and found to match.
            InterceptorAndDynamicMethodMatcher dm =
                    (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
            if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
                return dm.interceptor.invoke(this);
            }
            else {
                // Dynamic matching failed.
                // Skip this interceptor and invoke the next in the chain.
                return proceed();
            }
        }
        else {
            // It's an interceptor, so we just invoke it: The pointcut will have
            // been evaluated statically before this object was constructed.
            return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
        }
    }
复制代码



核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke。

public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(//如果没有自定义异步任务执行器,报下面这行错,不用管,可以默认执行
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }
        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();//阻塞等待执行完毕得到结果
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            }
        };
        //提交有任务给执行器
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }
复制代码




执行核心方法doSubmit

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        if (completableFuturePresent) {//先判断是否存在CompletableFuture这个类,优先使用CompletableFuture执行任务
            Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
            if (result != null) {
                return result;
            }
        }//返回值是可监听Future,定义过回调函数:addCallback
        if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }//返回值是Future
        else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        }
        else {//没有返回值
            executor.submit(task);
            return null;
        }
}
复制代码




总体流程


  • 从注解开始:@EnableAsync -> ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)


  • 从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())-> AOP-生成代理类AopProxy(postProcessAfterInitialization())-> AOP-切点执行(InvocationHandler.invoke)







相关文章
|
4天前
|
运维 Java 程序员
Spring5深入浅出篇:基于注解实现的AOP
# Spring5 AOP 深入理解:注解实现 本文介绍了基于注解的AOP编程步骤,包括原始对象、额外功能、切点和组装切面。步骤1-3旨在构建切面,与传统AOP相似。示例代码展示了如何使用`@Around`定义切面和执行逻辑。配置中,通过`@Aspect`和`@Around`注解定义切点,并在Spring配置中启用AOP自动代理。 进一步讨论了切点复用,避免重复代码以提高代码维护性。通过`@Pointcut`定义通用切点表达式,然后在多个通知中引用。此外,解释了AOP底层实现的两种动态代理方式:JDK动态代理和Cglib字节码增强,默认使用JDK,可通过配置切换到Cglib
|
2天前
|
JavaScript Java 开发者
Spring Boot中的@Lazy注解:概念及实战应用
【4月更文挑战第7天】在Spring Framework中,@Lazy注解是一个非常有用的特性,它允许开发者控制Spring容器的bean初始化时机。本文将详细介绍@Lazy注解的概念,并通过一个实际的例子展示如何在Spring Boot应用中使用它。
14 2
|
4天前
|
前端开发 Java
SpringBoot之自定义注解参数校验
SpringBoot之自定义注解参数校验
14 2
|
10天前
|
Java 测试技术 开发者
【亮剑】如何通过自定义注解来实现 Spring AOP,以便更加灵活地控制方法的拦截和增强?
【4月更文挑战第30天】通过自定义注解实现Spring AOP,可以更灵活地控制方法拦截和增强。首先定义自定义注解,如`@MyCustomAnnotation`,然后创建切面类`MyCustomAspect`,使用`@Pointcut`和`@Before/@After`定义切点及通知。配置AOP代理,添加`@EnableAspectJAutoProxy`到配置类。最后,在需拦截的方法上应用自定义注解。遵循保持注解职责单一、选择合适保留策略等最佳实践,提高代码可重用性和可维护性。记得测试AOP逻辑。
|
10天前
|
Java Spring
springboot自带的@Scheduled注解开启定时任务
springboot自带的@Scheduled注解开启定时任务
|
11天前
|
人工智能 算法 Java
程序设计的艺术:深入解析与实战演练
程序设计的艺术:深入解析与实战演练
24 4
|
11天前
|
存储 Java 大数据
JAVA:编程的艺术与实战解析
JAVA:编程的艺术与实战解析
19 2
|
13天前
|
XML JSON Java
【SpringBoot】springboot常用注解
【SpringBoot】springboot常用注解
|
13天前
|
Java 数据库 开发者
了解Spring Boot:重要注解详解
了解Spring Boot:重要注解详解
|
14天前
|
Java Spring 容器
SpringBoot自动装配原理之@Import注解解析
SpringBoot自动装配原理之@Import注解解析
53 0

推荐镜像

更多