【小家Spring】Spring任务调度@Scheduled的使用以及原理、源码分析(@EnableScheduling)(上)

简介: 【小家Spring】Spring任务调度@Scheduled的使用以及原理、源码分析(@EnableScheduling)(上)

前言


JDK给我们提供了定时任务的能力,详解之前有篇博文:

【小家java】Java定时任务ScheduledThreadPoolExecutor详解以及与Timer、TimerTask的区别(执行指定次数停止任务)


而Spring基于此做了更便捷的封装,使得我们使用起来异常的方便~


定时任务也是平时开发不可缺少的一个使用场景,本文主要看看Spring是怎么来实现这一套逻辑的?


Demo


@Service
public class HelloServiceImpl implements HelloService {
    // @Schedules
    // 它是允许重复注解的~~~~
    //@Scheduled(cron = "0/5 * * * * ?")
    @Scheduled(cron = "0/2 * * * * ?") // 每2秒钟执行一次
    public void job1() {
        System.out.println("我执行了~~" + LocalTime.now());
    }
}


然后让Spring开启定时任务的支持即可。


@Configuration
@EnableScheduling // 开启定时任务
public class RootConfig {
}

输出如下:每两秒钟执行一次


我执行了~~14:46:36.003
我执行了~~14:46:38.002
我执行了~~14:46:40.001
我执行了~~14:46:42.001
我执行了~~14:46:44.002
...



原理、源码分析


很显然又是@EnableXXX的设计模式,因此入口又从该注解开始吧~~~


@EnableScheduling


//@since 3.1
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}


可以看出它没有任何属性。所以只有一个@Import在起作用,因此重点看看SchedulingConfiguration


它的效果同XML中的<task:annotation-driven>


SchedulingConfiguration


@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
  @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
    return new ScheduledAnnotationBeanPostProcessor();
  }
}


同样的,也非常非常的简单。只是向容器注入了一个ScheduledAnnotationBeanPostProcessor,它是一个PostProcessor,同时也是一个SmartInitializingSingleton等等


ScheduledAnnotationBeanPostProcessor


Scheduled注解后处理器,项目启动时会扫描所有标记了@Scheduled注解的方法,封装成ScheduledTask注册起来。


这个处理器是处理定时任务的核心类,比较复杂。下面也是结合源码,来看看它具体的一个工作内容:


// 首先:非常震撼的是,它实现的接口非常的多。还好的是,大部分接口我们都很熟悉了。
// MergedBeanDefinitionPostProcessor:它是个BeanPostProcessor
// DestructionAwareBeanPostProcessor:在销毁此Bean的时候,会调用对应方法
// SmartInitializingSingleton:它会在所有的单例Bean都完成了初始化后,调用这个接口的方法
// EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware:都是些感知接口
// DisposableBean:该Bean销毁的时候会调用
// ApplicationListener<ContextRefreshedEvent>:监听容器的`ContextRefreshedEvent`事件
// ScheduledTaskHolder:维护本地的ScheduledTask实例
public class ScheduledAnnotationBeanPostProcessor
    implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
    Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
    SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
  /**
   * The default name of the {@link TaskScheduler} bean to pick up: "taskScheduler".
   * <p>Note that the initial lookup happens by type; this is just the fallback
   * in case of multiple scheduler beans found in the context.
   * @since 4.2
   */
   // 看着注释就知道,和@Async的默认处理一样~~~~先类型  在回退到名称
  public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
  // 调度器(若我们没有配置,它是null的)
  @Nullable
  private Object scheduler;
  // 这些都是Awire感知接口注入进来的~~
  @Nullable
  private StringValueResolver embeddedValueResolver;
  @Nullable
  private String beanName;
  @Nullable
  private BeanFactory beanFactory;
  @Nullable
  private ApplicationContext applicationContext;
  // ScheduledTaskRegistrar:ScheduledTask注册中心,ScheduledTaskHolder接口的一个重要的实现类,维护了程序中所有配置的ScheduledTask
  // 内部会处理调取器得工作,因此我建议先移步,看看这个类得具体分析
  private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
  // 缓存,没有被标注注解的class们  
  // 这有个技巧,使用了newSetFromMap,自然而然的这个set也就成了一个线程安全的set
  private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
  // 缓存对应的Bean上  里面对应的 ScheduledTask任务。可议有多个哦~~
  // 注意:此处使用了IdentityHashMap
  private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
  // 希望此processor是最后执行的~
  @Override
  public int getOrder() {
    return LOWEST_PRECEDENCE;
  }
   //Set the {@link org.springframework.scheduling.TaskScheduler} that will invoke the scheduled methods
   // 也可以是JDK的ScheduledExecutorService(内部会给你包装成一个TaskScheduler)
   // 若没有指定。那就会走默认策略:去从起中先按照类型找`TaskScheduler`该类型(或者ScheduledExecutorService这个类型也成)的。
   // 若有多个该类型或者找不到,就安好"taskScheduler"名称去找 
   // 再找不到,就用系统默认的:
  public void setScheduler(Object scheduler) {
    this.scheduler = scheduler;
  }
  ...
  // 此方法会在该容器内所有的单例Bean已经初始化全部结束后,执行
  @Override
  public void afterSingletonsInstantiated() {
    // Remove resolved singleton classes from cache
    // 因为已经是最后一步了,所以这个缓存可议清空了
    this.nonAnnotatedClasses.clear();
    // 在容器内运行,ApplicationContext都不会为null
    if (this.applicationContext == null) {
      // Not running in an ApplicationContext -> register tasks early...
      // 如果不是在ApplicationContext下运行的,那么就应该提前注册这些任务
      finishRegistration();
    }
  }
  // 兼容容器刷新的时间(此时候容器硬启动完成了)   它还在`afterSingletonsInstantiated`的后面执行
  @Override
  public void onApplicationEvent(ContextRefreshedEvent event) {
    // 这个动作务必要做:因为Spring可能有多个容器,所以可能会发出多个ContextRefreshedEvent 事件
    // 显然我们只处理自己容器发出来得事件,别的容器发出来我不管~~
    if (event.getApplicationContext() == this.applicationContext) {
      // Running in an ApplicationContext -> register tasks this late...
      // giving other ContextRefreshedEvent listeners a chance to perform
      // their work at the same time (e.g. Spring Batch's job registration).
      // 为其他ContextRefreshedEvent侦听器提供同时执行其工作的机会(例如,Spring批量工作注册)
      finishRegistration();
    }
  }
  private void finishRegistration() {
    // 如果setScheduler了,就以调用者指定的为准~~~
    if (this.scheduler != null) {
      this.registrar.setScheduler(this.scheduler);
    }
    // 这里继续厉害了:从容器中找到所有的接口`SchedulingConfigurer`的实现类(我们可议通过实现它定制化scheduler)
    if (this.beanFactory instanceof ListableBeanFactory) {
      Map<String, SchedulingConfigurer> beans =
          ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
      List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
      // 同@Async只允许设置一个不一样的是,这里每个都会让它生效
      // 但是平时使用,我们自顶一个类足矣~~~
      AnnotationAwareOrderComparator.sort(configurers);
      for (SchedulingConfigurer configurer : configurers) {
        configurer.configureTasks(this.registrar);
      }
    }
    // 至于task是怎么注册进registor的,请带回看`postProcessAfterInitialization`这个方法的实现
    // 有任务并且registrar.getScheduler() == null,那就去容器里找来试试~~~
    if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
      ...
      // 这块逻辑和@Async的处理一毛一样。忽略了 主要看看resolveSchedulerBean()这个方法即可
    }
    this.registrar.afterPropertiesSet();
  }
  // 从容器中去找一个
  private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) {
    // 若按名字去查找,那就按照名字找
    if (byName) {
      T scheduler = beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType);
      // 这个处理非常非常有意思,就是说倘若找到了你可以在任意地方直接@Autowired这个Bean了,可以拿这个共用Scheduler来调度我们自己的任务啦~~
      if (this.beanName != null && this.beanFactory instanceof ConfigurableBeanFactory) {
        ((ConfigurableBeanFactory) this.beanFactory).registerDependentBean(
            DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName);
      }
      return scheduler;
    }
    // 按照schedulerType该类型的名字匹配resolveNamedBean  底层依赖:getBeanNamesForType
    else if (beanFactory instanceof AutowireCapableBeanFactory) {
      NamedBeanHolder<T> holder = ((AutowireCapableBeanFactory) beanFactory).resolveNamedBean(schedulerType);
      if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory) {
        ((ConfigurableBeanFactory) beanFactory).registerDependentBean(holder.getBeanName(), this.beanName);
      }
      return holder.getBeanInstance();
    }
    // 按照类型找
    else {
      return beanFactory.getBean(schedulerType);
    }
  }
  @Override
  public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
  }
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) {
    return bean;
  }
  // Bean初始化完成后执行。去看看Bean里面有没有标注了@Scheduled的方法~~
  @Override
  public Object postProcessAfterInitialization(final Object bean, String beanName) {
    // 拿到目标类型(因为此类有可能已经被代理过)
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    // 这里对没有标注注解的类做了一个缓存,防止从父去扫描(毕竟可能有多个容器,可能有重复扫描的现象)
    if (!this.nonAnnotatedClasses.contains(targetClass)) {
      // 如下:主要用到了MethodIntrospector.selectMethods  这个内省方法工具类的这个g工具方法,去找指定Class里面,符合条件的方法们
      Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
          (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
            //过滤Method的核心逻辑就是是否标注有此注解(Merged表示标注在父类、或者接口处也是ok的)
            Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                method, Scheduled.class, Schedules.class);
            return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
          });
      if (annotatedMethods.isEmpty()) {
        this.nonAnnotatedClasses.add(targetClass);
        if (logger.isTraceEnabled()) {
          logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
        }
      }
      // 此处相当于已经找到了对应的注解方法~~~
      else {
        // Non-empty set of methods
        // 这里有一个双重遍历。因为一个方法上,可能重复标注多个这样的注解~~~~~
        // 所以最终遍历出来后,就交给processScheduled(scheduled, method, bean)去处理了
        annotatedMethods.forEach((method, scheduledMethods) ->
            scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
        if (logger.isDebugEnabled()) {
          logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods);
        }
      }
    }
    return bean;
  }
  // 这个方法就是灵魂了。就是执行这个注解,最终会把这个任务注册进去,并且启动的~~~
  protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
      // 标注此注解的方法必须是无参的方法
      Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
      // 拿到最终要被调用的方法  做这么一步操作主要是防止方法被代理了
      Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
      // 把该方法包装成一个Runnable 线程~~~
      Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
      boolean processedSchedule = false;
      String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
      // 装载任务,这里长度定为4,因为Spring认为标注4个注解还不够你用的?
      Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
      // Determine initial delay
      // 计算出延时多长时间执行 initialDelayString 支持占位符如:@Scheduled(fixedDelayString = "${time.fixedDelay}")
      // 这段话得意思是,最终拿到一个initialDelay值~~~~~Long型的
      long initialDelay = scheduled.initialDelay();
      String initialDelayString = scheduled.initialDelayString();
      if (StringUtils.hasText(initialDelayString)) {
        Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
        if (this.embeddedValueResolver != null) {
          initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
        }
        if (StringUtils.hasLength(initialDelayString)) {
          try {
            initialDelay = parseDelayAsLong(initialDelayString);
          }
          catch (RuntimeException ex) {
            throw new IllegalArgumentException(
                "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
          }
        }
      }
      // Check cron expression
      // 解析cron
      String cron = scheduled.cron();
      if (StringUtils.hasText(cron)) {
        String zone = scheduled.zone();
        // 由此课件,cron也可以使用占位符。把它配置在配置文件里就成~~~zone也是支持占位符的
        if (this.embeddedValueResolver != null) {
          cron = this.embeddedValueResolver.resolveStringValue(cron);
          zone = this.embeddedValueResolver.resolveStringValue(zone);
        }
        if (StringUtils.hasLength(cron)) {
          Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
          processedSchedule = true;
          TimeZone timeZone;
          if (StringUtils.hasText(zone)) {
            timeZone = StringUtils.parseTimeZoneString(zone);
          }
          else {
            timeZone = TimeZone.getDefault();
          }
          // 这个相当于,如果配置了cron,它就是一个task了,就可以吧任务注册进registrar里面了
          // 这里面的处理是。如果已经有调度器taskScheduler了,那就立马准备执行了                             
          tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
        }
      }
      // At this point we don't need to differentiate between initial delay set or not anymore
      if (initialDelay < 0) {
        initialDelay = 0;
      }
      ...
      // 下面就不再说了,就是解析fixed delay、fixed rated、
      // Check whether we had any attribute set
      Assert.isTrue(processedSchedule, errorMessage);
      // Finally register the scheduled tasks
      // 最后吧这些任务都放在全局属性里保存起来~~~~
      // getScheduledTasks()方法是会把所有的任务都返回出去的~~~ScheduledTaskHolder接口就一个Set<ScheduledTask> getScheduledTasks();方法嘛
      synchronized (this.scheduledTasks) {
        Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
        if (registeredTasks == null) {
          registeredTasks = new LinkedHashSet<>(4);
          this.scheduledTasks.put(bean, registeredTasks);
        }
        registeredTasks.addAll(tasks);
      }
    }
    catch (IllegalArgumentException ex) {
      throw new IllegalStateException(
          "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
    }
  }
  private static long parseDelayAsLong(String value) throws RuntimeException {
    if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) {
      return Duration.parse(value).toMillis();
    }
    return Long.parseLong(value);
  }
  private static boolean isP(char ch) {
    return (ch == 'P' || ch == 'p');
  }
   //@since 5.0.2 获取到所有的任务。包含本实例的,以及registrar(手动注册)的所有任务
  @Override
  public Set<ScheduledTask> getScheduledTasks() {
    Set<ScheduledTask> result = new LinkedHashSet<>();
    synchronized (this.scheduledTasks) {
      Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values();
      for (Set<ScheduledTask> tasks : allTasks) {
        result.addAll(tasks);
      }
    }
    result.addAll(this.registrar.getScheduledTasks());
    return result;
  }
  // Bean销毁之前执行。移除掉所有的任务,并且取消所有的任务
  @Override
  public void postProcessBeforeDestruction(Object bean, String beanName) {
    Set<ScheduledTask> tasks;
    synchronized (this.scheduledTasks) {
      tasks = this.scheduledTasks.remove(bean);
    }
    if (tasks != null) {
      for (ScheduledTask task : tasks) {
        task.cancel();
      }
    }
  }
  ...
}
相关文章
|
2月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
2月前
|
Java
Spring5入门到实战------9、AOP基本概念、底层原理、JDK动态代理实现
这篇文章是Spring5框架的实战教程,深入讲解了AOP的基本概念、如何利用动态代理实现AOP,特别是通过JDK动态代理机制在不修改源代码的情况下为业务逻辑添加新功能,降低代码耦合度,并通过具体代码示例演示了JDK动态代理的实现过程。
Spring5入门到实战------9、AOP基本概念、底层原理、JDK动态代理实现
|
3月前
|
Java 应用服务中间件 开发者
Java面试题:解释Spring Boot的优势及其自动配置原理
Java面试题:解释Spring Boot的优势及其自动配置原理
101 0
|
3月前
|
设计模式 监控 Java
解析Spring Cloud中的断路器模式原理
解析Spring Cloud中的断路器模式原理
|
2月前
|
XML Java 数据格式
Spring5入门到实战------2、IOC容器底层原理
这篇文章深入探讨了Spring5框架中的IOC容器,包括IOC的概念、底层原理、以及BeanFactory接口和ApplicationContext接口的介绍。文章通过图解和实例代码,解释了IOC如何通过工厂模式和反射机制实现对象的创建和管理,以及如何降低代码耦合度,提高开发效率。
Spring5入门到实战------2、IOC容器底层原理
|
2月前
|
Java 程序员 数据库连接
女朋友不懂Spring事务原理,今天给她讲清楚了!
该文章讲述了如何解释Spring事务管理的基本原理,特别是针对女朋友在面试中遇到的问题。文章首先通过一个简单的例子引入了传统事务处理的方式,然后详细讨论了Spring事务管理的实现机制。
女朋友不懂Spring事务原理,今天给她讲清楚了!
|
3月前
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
NoSQL Java 调度
在Spring Boot中实现分布式任务调度
在Spring Boot中实现分布式任务调度
|
3月前
|
XML Java 数据格式
深入理解Spring中的依赖注入原理
深入理解Spring中的依赖注入原理
|
7天前
|
SQL 监控 druid
springboot-druid数据源的配置方式及配置后台监控-自定义和导入stater(推荐-简单方便使用)两种方式配置druid数据源
这篇文章介绍了如何在Spring Boot项目中配置和监控Druid数据源,包括自定义配置和使用Spring Boot Starter两种方法。
下一篇
无影云桌面