【小家Spring】Spring异步处理@Async的使用以及原理、源码分析(@EnableAsync)(中)

简介: 【小家Spring】Spring异步处理@Async的使用以及原理、源码分析(@EnableAsync)(中)

AbstractAdvisingBeanPostProcessor


从这个名字也能看出来。它主要处理AdvisingBean,也就是处理Advisor和Bean的关系的


// 它继承自,ProxyProcessorSupport,说明它也拥有AOp的通用配置
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
  @Nullable
  protected Advisor advisor;
  protected boolean beforeExistingAdvisors = false;
  // 缓存合格的Bean们
  private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);
  // 当遇到一个pre-object的时候,是否把该processor所持有得advisor放在现有的增强器们之前执行
  // 默认是false,会放在最后一个位置上的
  public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
    this.beforeExistingAdvisors = beforeExistingAdvisors;
  }
  // 不处理
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) {
    return bean;
  }
  // Bean已经实例化、初始化完成之后执行。
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) {
    // 忽略AopInfrastructureBean的Bean,并且如果没有advisor也会忽略不处理~~~~~
    if (bean instanceof AopInfrastructureBean || this.advisor == null) {
      // Ignore AOP infrastructure such as scoped proxies.
      return bean;
    }
    // 如果这个Bean已经被代理过了(比如已经被AOP切中了),那本处就无需再重复创建代理了嘛
    // 直接向里面添加advisor就成了
    if (bean instanceof Advised) {
      Advised advised = (Advised) bean;
      // 注意此advised不能是已经被冻结了的。且源对象必须是Eligible合格的
      if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
        // Add our local Advisor to the existing proxy's Advisor chain...
        // 把自己持有的这个advisor放在首位(如果beforeExistingAdvisors=true)
        if (this.beforeExistingAdvisors) {
          advised.addAdvisor(0, this.advisor);
        }
        // 否则就是尾部位置
        else {
          advised.addAdvisor(this.advisor);
        }
        // 最终直接返回即可,因为已经没有必要再创建一次代理对象了
        return bean;
      }
    }
    // 如果这个Bean事合格的(此方法下面有解释)   这个时候是没有被代理过的
    if (isEligible(bean, beanName)) {
      // 以当前的配置,创建一个ProxyFactory 
      ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
      // 如果不是使用CGLIB常见代理,那就去分析出它所实现的接口们  然后放进ProxyFactory 里去
      if (!proxyFactory.isProxyTargetClass()) {
        evaluateProxyInterfaces(bean.getClass(), proxyFactory);
      }
      // 切面就是当前持有得advisor
      proxyFactory.addAdvisor(this.advisor);
      // 留给子类,自己还可以对proxyFactory进行自定义~~~~~
      customizeProxyFactory(proxyFactory);
      // 最终返回这个代理对象~~~~~
      return proxyFactory.getProxy(getProxyClassLoader());
    }
    // No async proxy needed.(相当于没有做任何的代理处理,返回原对象)
    return bean;
  }
  // 检查这个Bean是否是合格的
  protected boolean isEligible(Object bean, String beanName) {
    return isEligible(bean.getClass());
  }
  protected boolean isEligible(Class<?> targetClass) {
    // 如果已经被缓存着了,那肯定靠谱啊
    Boolean eligible = this.eligibleBeans.get(targetClass);
    if (eligible != null) {
      return eligible;
    }
    // 如果没有切面(就相当于没有给配置增强器,那铁定是不合格的)
    if (this.advisor == null) {
      return false;
    }
    // 这个重要了:看看这个advisor是否能够切入进targetClass这个类,能够切入进取的也是合格的
    eligible = AopUtils.canApply(this.advisor, targetClass);
    this.eligibleBeans.put(targetClass, eligible);
    return eligible;
  }
  // 子类可以复写。比如`AbstractBeanFactoryAwareAdvisingPostProcessor`就复写了这个方法~~~
  protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.copyFrom(this);
    proxyFactory.setTarget(bean);
    return proxyFactory;
  }
  // 子类复写~
  protected void customizeProxyFactory(ProxyFactory proxyFactory) {
  }
}


看看它的继承图:


image.png


MethodValidationPostProcessor属于JSR-303校验方面的范畴,不是本文的内容,因此不会讲述


AbstractBeanFactoryAwareAdvisingPostProcessor

从名字可以看出,它相较于父类,就和BeanFactory有关了,也就是和Bean容器相关了~~~

public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
    implements BeanFactoryAware {
  // Bean工厂
  @Nullable
  private ConfigurableListableBeanFactory beanFactory;
  // 如果这个Bean工厂不是ConfigurableListableBeanFactory ,那就set一个null
  // 我们的`DefaultListableBeanFactory`显然就是它的子类~~~~~
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
        (ConfigurableListableBeanFactory) beanFactory : null);
  }
  @Override
  protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
    // 如果Bean工厂是正常的,那就把这个Bean  expose一个特殊的Bean,记录下它的类型
    if (this.beanFactory != null) {
      AutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
    }
    ProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
    // 这里创建代理也是和`AbstractAutoProxyCreator`差不多的逻辑。
    // 如果没有显示的设置为CGLIB,并且toProxyUtils.shouldProxyTargetClass还被暴露过时一个特殊的Bean,那就强制使用CGLIB代理吧    这里一般和Scope无关的话,都返回false了
    if (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
        AutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {
      proxyFactory.setProxyTargetClass(true);
    }
    return proxyFactory;
  }
}


下面就可以看看具体的两个实现类了:


AsyncAnnotationBeanPostProcessor


该实现类就是具体和@Async相关的一个类了~~~


public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
  // 建议换成AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME  这样语意更加的清晰些
  public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
  // 注解类型
  @Nullable
  private Class<? extends Annotation> asyncAnnotationType;
  // 异步的执行器
  @Nullable
  private Executor executor;
  // 异步异常处理器
  @Nullable
  private AsyncUncaughtExceptionHandler exceptionHandler;
  // 此处特别注意:这里设置为true,也就是说@Async的Advisor会放在首位
  public AsyncAnnotationBeanPostProcessor() {
    setBeforeExistingAdvisors(true);
  }
  // 可以设定需要扫描哪些注解类型。默认只扫描@Async以及`javax.ejb.Asynchronous`这个注解
  public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
    Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
    this.asyncAnnotationType = asyncAnnotationType;
  }
  // 如果没有指定。那就将执行全局得默认查找。在上下文中查找唯一的`TaskExecutor`类型的Bean,或者一个名称为`taskExecutor`的Executor
  // 当然,如果上面途径都没找到。那就会采用一个默认的任务池
  public void setExecutor(Executor executor) {
    this.executor = executor;
  }
  public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
    this.exceptionHandler = exceptionHandler;
  }
  // 重写了父类的方法。然后下面:自己new了一个AsyncAnnotationAdvisor ,传入executor和exceptionHandler
  // 并且最终this.advisor = advisor 
  // 因此可议看出:AsyncAnnotationAdvisor 才是重点了。它定义了它的匹配情况~~~~
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
      advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
  }
}


AsyncAnnotationAdvisor


可以看出,它是一个PointcutAdvisor,并且Pointcut是一个AnnotationMatchingPointcut,因此是为注解来匹配的


public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
  private AsyncUncaughtExceptionHandler exceptionHandler;
  // 增强器
  private Advice advice;
  // 切点
  private Pointcut pointcut;
  // 两个都为null,那就是都会采用默认的方案
  public AsyncAnnotationAdvisor() {
    this(null, null);
  }
  // 创建一个AsyncAnnotationAdvisor实例,可以自己指定Executor 和 AsyncUncaughtExceptionHandler 
  @SuppressWarnings("unchecked")
  public AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {
    // 这里List长度选择2,应为绝大部分情况下只会支持这两种@Async和@Asynchronous
    Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);
    try {
      asyncAnnotationTypes.add((Class<? extends Annotation>)
          ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
    }
    catch (ClassNotFoundException ex) {
      // If EJB 3.1 API not present, simply ignore.
    }
    if (exceptionHandler != null) {
      this.exceptionHandler = exceptionHandler;
    }
    // 若没指定,那就使用默认的SimpleAsyncUncaughtExceptionHandler(它仅仅是输出了一句日志而已)
    else {
      this.exceptionHandler = new SimpleAsyncUncaughtExceptionHandler();
    }
    // 这两个方法是重点,下面会重点介绍
    this.advice = buildAdvice(executor, this.exceptionHandler);
    this.pointcut = buildPointcut(asyncAnnotationTypes);
  }
  // 如果set了Executor,advice会重新构建。
  public void setTaskExecutor(Executor executor) {
    this.advice = buildAdvice(executor, this.exceptionHandler);
  }
  // 这里注意:如果你自己指定了注解类型。那么将不再扫描其余两个默认的注解,因此pointcut也就需要重新构建了
  public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
    Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
    Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
    asyncAnnotationTypes.add(asyncAnnotationType);
    this.pointcut = buildPointcut(asyncAnnotationTypes);
  }
  // 如果这个advice也实现了BeanFactoryAware,那就也把BeanFactory放进去
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    if (this.advice instanceof BeanFactoryAware) {
      ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
    }
  }
  @Override
  public Advice getAdvice() {
    return this.advice;
  }
  @Override
  public Pointcut getPointcut() {
    return this.pointcut;
  }
  // 这个最终又是委托给`AnnotationAsyncExecutionInterceptor`,它是一个具体的增强器,有着核心内容
  protected Advice buildAdvice(@Nullable Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
    return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler);
  }
  // Calculate a pointcut for the given async annotation types, if any
  protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    // 采用一个组合切面:ComposablePointcut (因为可能需要支持多个注解嘛)
    ComposablePointcut result = null;
    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
      // 这里为何new出来两个AnnotationMatchingPointcut??????
      // 第一个:类匹配(只需要类上面有这个注解,所有的方法都匹配)this.methodMatcher = MethodMatcher.TRUE;
      // 第二个:方法匹配。所有的类都可议。但是只有方法上有这个注解才会匹配上
      Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
      Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
      if (result == null) {
        result = new ComposablePointcut(cpc);
      }
      else {
        result.union(cpc);
      }
      // 最终的结果都是取值为并集的~~~~~~~
      result = result.union(mpc);
    }
    //  最后一个处理厉害了:也就是说你啥类型都木有的情况下,是匹配所有类的所有方法~~~
    return (result != null ? result : Pointcut.TRUE);
  }
}


从上的源码可议看出,默认是支持@Asycn以及EJB的那个异步注解的。但是最终的增强行为,委托给了AnnotationAsyncExecutionInterceptor


AnnotationAsyncExecutionInterceptor:@Async拦截器


image.png


可知,它是一个MethodInterceptor,并且继承自AsyncExecutionAspectSupport


AsyncExecutionAspectSupport


从类名就可以看出,它是用来支持处理异步线程执行器的,若没有指定,靠它提供一个默认的异步执行器。


public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
  // 这是备选的。如果找到多个类型为TaskExecutor的Bean,才会备选的再用这个名称去找的~~~
  public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
  // 缓存~~~AsyncTaskExecutor是TaskExecutor的子接口
  // 从这可以看出:不同的方法,对应的异步执行器还不一样咯~~~~~~
  private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
  // 默认的线程执行器
  @Nullable
  private volatile Executor defaultExecutor;
  // 异步异常处理器
  private AsyncUncaughtExceptionHandler exceptionHandler;
  // Bean工厂
  @Nullable
  private BeanFactory beanFactory;
  public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
    this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());
  }
  public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
    this.defaultExecutor = defaultExecutor;
    this.exceptionHandler = exceptionHandler;
  }
  public void setExecutor(Executor defaultExecutor) {
    this.defaultExecutor = defaultExecutor;
  }
  public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
    this.exceptionHandler = exceptionHandler;
  }
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    this.beanFactory = beanFactory;
  }
  // 该方法是找到一个异步执行器,去执行这个方法~~~~~~
  @Nullable
  protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    // 如果缓存中能够找到该方法对应的执行器,就立马返回了
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
      Executor targetExecutor;
      // 抽象方法:`AnnotationAsyncExecutionInterceptor`有实现。就是@Async注解的value值
      String qualifier = getExecutorQualifier(method);
      // 现在知道@Async直接的value值的作用了吧。就是制定执行此方法的执行器的(容器内执行器的Bean的名称)
      // 当然有可能为null。注意此处是支持@Qualified注解标注在类上来区分Bean的
      // 注意:此处targetExecutor仍然可能为null
      if (StringUtils.hasLength(qualifier)) {
        targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
      }
      // 注解没有指定value值,那就去找默认的执行器
      else {
        targetExecutor = this.defaultExecutor;
        if (targetExecutor == null) {
          // 去找getDefaultExecutor()~~~
          synchronized (this.executors) {
            if (this.defaultExecutor == null) {
              this.defaultExecutor = getDefaultExecutor(this.beanFactory);
            }
            targetExecutor = this.defaultExecutor;
          }
        }
      }
      // 若还未null,那就返回null吧
      if (targetExecutor == null) {
        return null;
      }
      // 把targetExecutor 包装成一个AsyncTaskExecutor返回,并且缓存起来。
      // TaskExecutorAdapter就是AsyncListenableTaskExecutor的一个实现类
      executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
          (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
      this.executors.put(method, executor);
    }
    return executor;
  }
  // 子类去复写此方法。也就是拿到对应的key,从而方便找bean吧(执行器)
  @Nullable
  protected abstract String getExecutorQualifier(Method method);
  // @since 4.2.6  也就是根据
  @Nullable
  protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
    if (beanFactory == null) {
      throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
          " to access qualified executor '" + qualifier + "'");
    }
    return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
  }
  // @since 4.2.6 
  // Retrieve or build a default executor for this advice instance  检索或者创建一个默认的executor 
  @Nullable
  protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    if (beanFactory != null) {
      // 这个处理很有意思,它是用用的try catch的技巧去处理的
      try {
        // 如果容器内存在唯一的TaskExecutor(子类),就直接返回了
        return beanFactory.getBean(TaskExecutor.class);
      }
      catch (NoUniqueBeanDefinitionException ex) {
        // 这是出现了多个TaskExecutor类型的话,那就按照名字去拿  `taskExecutor`且是Executor类型
        try {
          return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
        }
        // 如果再没有找到,也不要报错,而是接下来创建一个默认的处理器
        // 这里输出一个info信息
        catch (NoSuchBeanDefinitionException ex2) {
        }
      }
      catch (NoSuchBeanDefinitionException ex) {
        try {
          return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
        }
        catch (NoSuchBeanDefinitionException ex2) {
        }
        // 这里还没有获取到,就放弃。 用本地默认的executor吧~~~
        // 子类可以去复写此方法,发现为null的话可议给一个默认值~~~~比如`AsyncExecutionInterceptor`默认给的就是`SimpleAsyncTaskExecutor`作为执行器的
        // Giving up -> either using local default executor or none at all...
      }
    }
    return null;
  }
  //Delegate for actually executing the given task with the chosen executor
  // 用选定的执行者实际执行给定任务的委托
  @Nullable
  protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    // 根据不同的返回值类型,来采用不同的方案去异步执行,但是执行器都是executor
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
      return CompletableFuture.supplyAsync(() -> {
        try {
          return task.call();
        }
        catch (Throwable ex) {
          throw new CompletionException(ex);
        }
      }, executor);
    }
    // ListenableFuture接口继承自Future  是Spring自己扩展的一个接口。
    // 同样的AsyncListenableTaskExecutor也是Spring扩展自AsyncTaskExecutor的
    else if (ListenableFuture.class.isAssignableFrom(returnType)) {
      return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    }
    // 普通的submit
    else if (Future.class.isAssignableFrom(returnType)) {
      return executor.submit(task);
    }
    // 没有返回值的情况下  也用sumitt提交,按时返回null
    else {
      executor.submit(task);
      return null;
    }
  }
  // 处理错误
  protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
    // 如果方法的返回值类型是Future,就rethrowException,表示直接throw出去
    if (Future.class.isAssignableFrom(method.getReturnType())) {
      ReflectionUtils.rethrowException(ex);
    } else {
      // Could not transmit the exception to the caller with default executor
      try {
        this.exceptionHandler.handleUncaughtException(ex, method, params);
      } catch (Throwable ex2) {
        logger.error("Exception handler for async method '" + method.toGenericString() +
            "' threw unexpected exception itself", ex2);
      }
    }
  }
}

这个类相当于已经做了大部分的工作了,接下来继续看子类:

相关文章
|
23小时前
|
安全 Java Spring
Spring之Aop的底层原理
Spring之Aop的底层原理
|
23小时前
|
监控 安全 Java
Spring cloud原理详解
Spring cloud原理详解
17 0
|
23小时前
|
Java 开发者 微服务
Spring Cloud原理详解
【5月更文挑战第4天】Spring Cloud是Spring生态系统中的微服务框架,包含配置管理、服务发现、断路器、API网关等工具,简化分布式系统开发。核心组件如Eureka(服务发现)、Config Server(配置中心)、Ribbon(负载均衡)、Hystrix(断路器)、Zuul(API网关)等。本文讨论了Spring Cloud的基本概念、核心组件、常见问题及解决策略,并提供代码示例,帮助开发者更好地理解和实践微服务架构。此外,还涵盖了服务通信方式、安全性、性能优化、自动化部署、服务网格和无服务器架构的融合等话题,揭示了微服务架构的未来趋势。
35 6
|
23小时前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。
|
23小时前
|
安全 Java API
Spring工厂API与原理
Spring工厂API与原理
35 10
|
23小时前
|
Java 应用服务中间件 Maven
SpringBoot 项目瘦身指南
SpringBoot 项目瘦身指南
56 0
|
23小时前
|
缓存 安全 Java
Spring Boot 面试题及答案整理,最新面试题
Spring Boot 面试题及答案整理,最新面试题
138 0
|
23小时前
|
存储 JSON Java
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
46 2
|
23小时前
|
前端开发 搜索推荐 Java
【Spring底层原理高级进阶】基于Spring Boot和Spring WebFlux的实时推荐系统的核心:响应式编程与 WebFlux 的颠覆性变革
【Spring底层原理高级进阶】基于Spring Boot和Spring WebFlux的实时推荐系统的核心:响应式编程与 WebFlux 的颠覆性变革
|
23小时前
|
前端开发 Java 应用服务中间件
Springboot对MVC、tomcat扩展配置
Springboot对MVC、tomcat扩展配置