通过源码理解Spring中@Scheduled的实现原理并且实现调度任务动态装载(上)

简介: 最近的新项目和数据同步相关,有定时调度的需求。之前一直有使用过Quartz、XXL-Job、Easy Scheduler等调度框架,后来越发觉得这些框架太重量级了,于是想到了Spring内置的Scheduling模块。而原生的Scheduling模块只是内存态的调度模块,不支持任务的持久化或者配置(配置任务通过@Scheduled注解进行硬编码,不能抽离到类之外),因此考虑理解Scheduling模块的底层原理,并且基于此造一个简单的轮子,使之支持调度任务配置:通过配置文件或者JDBC数据源。

前提



最近的新项目和数据同步相关,有定时调度的需求。之前一直有使用过QuartzXXL-JobEasy Scheduler等调度框架,后来越发觉得这些框架太重量级了,于是想到了Spring内置的Scheduling模块。而原生的Scheduling模块只是内存态的调度模块,不支持任务的持久化或者配置(配置任务通过@Scheduled注解进行硬编码,不能抽离到类之外),因此考虑理解Scheduling模块的底层原理,并且基于此造一个简单的轮子,使之支持调度任务配置:通过配置文件或者JDBC数据源。


Scheduling模块



Scheduling模块是spring-context依赖下的一个包org.springframework.scheduling


微信截图_20220512213621.png


这个模块的类并不多,有四个子包:


  • 顶层包的定义了一些通用接口和异常。
  • org.springframework.scheduling.annotation:定义了调度、异步任务相关的注解和解析类,常用的注解如@Async@EnableAsync@EnableScheduling@Scheduled
  • org.springframework.scheduling.concurrent:定义了调度任务执行器和相对应的FactoryBean
  • org.springframework.scheduling.config:定义了配置解析、任务具体实现类、调度任务XML配置文件解析相关的解析类。
  • org.springframework.scheduling.support:定义了反射支持类、Cron表达式解析器等工具类。


如果想单独使用Scheduling,只需要引入spring-context这个依赖。但是现在流行使用SpringBoot,引入spring-boot-starter-web已经集成了spring-context,可以直接使用Scheduling模块,笔者编写本文的时候(2020-03-14SpringBoot的最新版本为2.2.5.RELEASE,可以选用此版本进行源码分析或者生产应用:


<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spring.boot.version>2.2.5.RELEASE</spring.boot.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>
复制代码


开启Scheduling模块支持只需要在某一个配置类中添加@EnableScheduling注解即可,一般为了明确模块的引入,建议在启动类中使用此注解,如:


@EnableScheduling
@SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}
复制代码


Scheduling模块的工作流程



微信截图_20220512213632.png


这个图描述了Scheduling模块的工作流程,这里分析一下非XML配置下的流程(右边的分支):


  • 通过注解@EnableScheduling中的@Import引入了SchedulingConfiguration,而SchedulingConfiguration中配置了一个类型为ScheduledAnnotationBeanPostProcessor名称为org.springframework.context.annotation.internalScheduledAnnotationProcessorBean,这里有个常见的技巧,Spring内部加载的Bean一般会定义名称为internalXXXBeanrole会定义为ROLE_INFRASTRUCTURE = 2
  • Bean后置处理器ScheduledAnnotationBeanPostProcessor会解析和处理每一个符合特定类型的Bean中的@Scheduled注解(注意@Scheduled只能使用在方法或者注解上),并且把解析完成的方法封装为不同类型的Task实例,缓存在ScheduledTaskRegistrar中的。
  • ScheduledAnnotationBeanPostProcessor中的钩子接口方法afterSingletonsInstantiated()在所有单例初始化完成之后回调触发,在此方法中设置了ScheduledTaskRegistrar中的任务调度器(TaskScheduler或者ScheduledExecutorService类型)实例,并且调用ScheduledTaskRegistrar#afterPropertiesSet()方法添加所有缓存的Task实例到任务调度器中执行。


任务调度器


Scheduling模块支持TaskScheduler或者ScheduledExecutorService类型的任务调度器,而ScheduledExecutorService其实是JDK并发包java.util.concurrent的接口,一般实现类就是调度线程池ScheduledThreadPoolExecutor。实际上,ScheduledExecutorService类型的实例最终会通过适配器模式转变为ConcurrentTaskScheduler,所以这里只需要分析TaskScheduler类型的执行器。


  • ThreadPoolTaskScheduler:基于线程池实现的任务执行器,这个是最常用的实现,底层依赖于ScheduledThreadPoolExecutor实现。
  • ConcurrentTaskSchedulerTaskScheduler接口和ScheduledExecutorService接口的适配器,如果自定义一个ScheduledThreadPoolExecutor类型的Bean,那么任务执行器就会适配为ConcurrentTaskScheduler
  • DefaultManagedTaskSchedulerJDK7引入的JSR-236的支持,可以通过JNDI配置此调度执行器,一般很少用到,底层也是依赖于ScheduledThreadPoolExecutor实现。


也就是说,内置的三个调度器类型底层都依赖于JUC调度线程池ScheduledThreadPoolExecutor。这里分析一下顶层接口org.springframework.scheduling.TaskScheduler提供的功能(笔者已经把功能一致的default方法暂时移除):


// 省略一些功能一致的default方法
public interface TaskScheduler {
     // 调度一个任务,通过触发器实例指定触发时间周期
     ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
     // 指定起始时间调度一个任务 - 单次执行
     ScheduledFuture<?> schedule(Runnable task, Date startTime);
     // 指定固定频率调度一个任务,period的单位是毫秒
     ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
     // 指定起始时间和固定频率调度一个任务,period的单位是毫秒
     ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
     // 指定固定延迟间隔调度一个任务,delay的单位是毫秒
     ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);
    // 指定起始时间和固定延迟间隔调度一个任务,delay的单位是毫秒
     ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
}
复制代码


Task的分类


Scheduling模块中支持不同类型的任务,主要包括下面的3种(解析的优先顺序也是如下):

  1. Cron表达式任务,支持通过Cron表达式配置执行的周期,对应的任务类型为org.springframework.scheduling.config.CronTask
  2. 固定延迟间隔任务,也就是上一轮执行完毕后间隔固定周期再执行本轮,依次类推,对应的的任务类型为org.springframework.scheduling.config.FixedDelayTask
  3. 固定频率任务,基于固定的间隔时间执行,不会理会上一轮是否执行完毕本轮会照样执行,对应的的任务类型为org.springframework.scheduling.config.FixedRateTask


关于这几类Task,举几个简单的例子。CronTask是通过cron表达式指定执行周期的,并且不支持延迟执行,可以使用特殊字符-禁用任务执行:


// 注解声明式使用 - 每五秒执行一次,不支持initialDelay
@Scheduled(cron = "*/5 * * * * ?")
public void processTask(){
}
// 注解声明式使用 - 禁止任务执行
@Scheduled(cron = "-")
public void processTask(){
}
// 编程式使用
public class Tasks {
    static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args) throws Exception {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        CronTask cronTask = new CronTask(() -> {
            System.out.println(String.format("[%s] - CronTask触发...", F.format(LocalDateTime.now())));
        }, "*/5 * * * * ?");
        taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger());
        Thread.sleep(Integer.MAX_VALUE);
    }
}
// 某次执行输出结果
[2020-03-16 01:07:00] - CronTask触发...
[2020-03-16 01:07:05] - CronTask触发...
......
复制代码


FixedDelayTask需要配置延迟间隔值(fixedDelay或者fixedDelayString)和可选的起始延迟执行时间(initialDelay或者initialDelayString),这里注意一点是fixedDelayStringinitialDelayString都支持从EmbeddedValueResolver(简单理解为配置文件的属性处理器)读取和Duration(例如P2D就是parses as 2 days,表示86400秒)支持格式的解析:


// 注解声明式使用 - 延迟一秒开始执行,延迟间隔为5秒
@Scheduled(fixedDelay = 5000, initialDelay = 1000)
public void process(){
}
// 注解声明式使用 - spring-boot配置文件中process.task.fixedDelay=5000  process.task.initialDelay=1000
@Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 编程式使用
public class Tasks {
    static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args) throws Exception {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> {
            System.out.println(String.format("[%s] - FixedDelayTask触发...", F.format(LocalDateTime.now())));
        }, 5000, 1000);
        Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay());
        taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval());
        Thread.sleep(Integer.MAX_VALUE);
    }
}
// 某次执行输出结果
[2020-03-16 01:06:12] - FixedDelayTask触发...
[2020-03-16 01:06:17] - FixedDelayTask触发...
......
复制代码


FixedRateTask需要配置固定间隔值(fixedRate或者fixedRateString)和可选的起始延迟执行时间(initialDelay或者initialDelayString),这里注意一点是fixedRateStringinitialDelayString都支持从EmbeddedValueResolver(简单理解为配置文件的属性处理器)读取和Duration(例如P2D就是parses as 2 days,表示86400秒)支持格式的解析:


// 注解声明式使用 - 延迟一秒开始执行,每隔5秒执行一次
@Scheduled(fixedRate = 5000, initialDelay = 1000)
public void processTask(){
}
// 注解声明式使用 - spring-boot配置文件中process.task.fixedRate=5000  process.task.initialDelay=1000
@Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 编程式使用
public class Tasks {
    static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args) throws Exception {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        FixedRateTask fixedRateTask = new FixedRateTask(() -> {
            System.out.println(String.format("[%s] - FixedRateTask触发...", F.format(LocalDateTime.now())));
        }, 5000, 1000);
        Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay());
        taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval());
        Thread.sleep(Integer.MAX_VALUE);
    }
}
// 某次执行输出结果
[2020-03-16 23:58:25] - FixedRateTask触发...
[2020-03-16 23:58:30] - FixedRateTask触发...
......
复制代码


简单分析核心流程的源代码


SpringBoot注解体系下,Scheduling模块的所有逻辑基本在ScheduledAnnotationBeanPostProcessorScheduledTaskRegistrar中。一般来说,一个类实现的接口代表了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor实现的接口:


  • ScheduledTaskHolder接口:返回Set<ScheduledTask>,表示持有的所有任务实例。
  • MergedBeanDefinitionPostProcessor接口:Bean定义合并时回调,预留空实现,暂时不做任何处理。
  • BeanPostProcessor接口:也就是MergedBeanDefinitionPostProcessor的父接口,Bean实例初始化前后分别回调,其中,后回调的postProcessAfterInitialization()方法就是用于解析@Scheduled和装载ScheduledTask,需要重点关注此方法的逻辑。
  • DestructionAwareBeanPostProcessor接口:具体的Bean实例销毁的时候回调,用于Bean实例销毁的时候移除和取消对应的任务实例。
  • Ordered接口:用于Bean加载时候的排序,主要是改变ScheduledAnnotationBeanPostProcessorBeanPostProcessor执行链中的顺序。
  • EmbeddedValueResolverAware接口:回调StringValueResolver实例,用于解析带占位符的环境变量属性值。
  • BeanNameAware接口:回调BeanName
  • BeanFactoryAware接口:回调BeanFactory实例,具体是DefaultListableBeanFactory,也就是熟知的IOC容器。
  • ApplicationContextAware接口:回调ApplicationContext实例,也就是熟知的Spring上下文,它是IOC容器的门面,同时是事件广播器、资源加载器的实现等等。
  • SmartInitializingSingleton接口:所有单例实例化完毕之后回调,作用是在持有的applicationContextNULL的时候开始调度所有加载完成的任务,这个钩子接口十分有用,笔者常用它做一些资源初始化工作。
  • ApplicationListener接口:监听Spring应用的事件,具体是ApplicationListener<ContextRefreshedEvent>,监听上下文刷新的事件,如果事件中携带的ApplicationContext实例和ApplicationContextAware回调的ApplicationContext实例一致,那么在此监听回调方法中开始调度所有加载完成的任务,也就是在ScheduledAnnotationBeanPostProcessor这个类中,SmartInitializingSingleton接口的实现和ApplicationListener接口的实现逻辑是互斥的。
  • DisposableBean接口:当前Bean实例销毁时候回调,也就是ScheduledAnnotationBeanPostProcessor自身被销毁的时候回调,用于取消和清理所有的ScheduledTask


上面分析的钩子接口在SpringBoot体系中可以按需使用,了解回调不同钩子接口的回调时机,可以在特定时机完成达到理想的效果。


@Scheduled注解的解析集中在postProcessAfterInitialization()方法:


public Object postProcessAfterInitialization(Object bean, String beanName) {
    // 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三种类型的Bean
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
            bean instanceof ScheduledExecutorService) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }
    // 获取Bean的用户态类型,例如Bean有可能被CGLIB增强,这个时候要取其父类
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    // nonAnnotatedClasses存放着不存在@Scheduled注解的类型,缓存起来避免重复判断它是否携带@Scheduled注解的方法
    if (!this.nonAnnotatedClasses.contains(targetClass) &&
            AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
        // 因为JDK8之后支持重复注解,因此获取具体类型中Method -> @Scheduled的集合,也就是有可能一个方法使用多个@Scheduled注解,最终会封装为多个Task
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                            method, Scheduled.class, Schedules.class);
                    return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                });
        // 解析到类型中不存在@Scheduled注解的方法添加到nonAnnotatedClasses缓存
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
            if (logger.isTraceEnabled()) {
                logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
            }
        }
        else {
            // Method -> @Scheduled的集合遍历processScheduled()方法进行登记
            annotatedMethods.forEach((method, scheduledMethods) ->
                    scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
            if (logger.isTraceEnabled()) {
                logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                        "': " + annotatedMethods);
            }
        }
    }
    return bean;
}
复制代码


processScheduled(Scheduled scheduled, Method method, Object bean)就是具体的注解解析和Task封装的方法:


// Runnable适配器 - 用于反射调用具体的方法,触发任务方法执行
public class ScheduledMethodRunnable implements Runnable {
  private final Object target;
  private final Method method;
  public ScheduledMethodRunnable(Object target, Method method) {
    this.target = target;
    this.method = method;
  }
        ....// 省略无关代码
        // 这个就是最终的任务方法执行的核心方法,抑制修饰符,然后反射调用
  @Override
  public void run() {
    try {
      ReflectionUtils.makeAccessible(this.method);
      this.method.invoke(this.target);
    }
    catch (InvocationTargetException ex) {
      ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
    }
    catch (IllegalAccessException ex) {
      throw new UndeclaredThrowableException(ex);
    }
  }    
}
// 通过方法所在Bean实例和方法封装Runnable适配器ScheduledMethodRunnable实例
protected Runnable createRunnable(Object target, Method method) {
  Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
  Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
  return new ScheduledMethodRunnable(target, invocableMethod);
}
// 这个方法十分长,不过逻辑并不复杂,它只做了四件事
// 0. 解析@Scheduled中的initialDelay、initialDelayString属性,适用于FixedDelayTask或者FixedRateTask的延迟执行
// 1. 优先解析@Scheduled中的cron属性,封装为CronTask,通过ScheduledTaskRegistrar进行缓存
// 2. 解析@Scheduled中的fixedDelay、fixedDelayString属性,封装为FixedDelayTask,通过ScheduledTaskRegistrar进行缓存
// 3. 解析@Scheduled中的fixedRate、fixedRateString属性,封装为FixedRateTask,通过ScheduledTaskRegistrar进行缓存
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        // 通过方法宿主Bean和目标方法封装Runnable适配器ScheduledMethodRunnable实例
        Runnable runnable = createRunnable(bean, method);
        boolean processedSchedule = false;
        String errorMessage =
                "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
        // 缓存已经装载的任务
        Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
        // Determine initial delay
        // 解析初始化延迟执行时间,initialDelayString支持占位符配置,如果initialDelayString配置了,会覆盖initialDelay的值
        long initialDelay = scheduled.initialDelay();
        String initialDelayString = scheduled.initialDelayString();
        if (StringUtils.hasText(initialDelayString)) {
            Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
            if (this.embeddedValueResolver != null) {
                initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
            }
            if (StringUtils.hasLength(initialDelayString)) {
                try {
                    initialDelay = parseDelayAsLong(initialDelayString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                }
            }
        }
        // Check cron expression
        // 解析时区zone的值,支持支持占位符配置,判断cron是否存在,存在则装载为CronTask
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (this.embeddedValueResolver != null) {
                cron = this.embeddedValueResolver.resolveStringValue(cron);
                zone = this.embeddedValueResolver.resolveStringValue(zone);
            }
            if (StringUtils.hasLength(cron)) {
                Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                if (!Scheduled.CRON_DISABLED.equals(cron)) {
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }
                    // 此方法虽然表面上是调度CronTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
                    // 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
            }
        }
        // At this point we don't need to differentiate between initial delay set or not anymore
        // 修正小于0的初始化延迟执行时间值为0
        if (initialDelay < 0) {
            initialDelay = 0;
        }
        // 解析fixedDelay和fixedDelayString,如果同时配置,fixedDelayString最终解析出来的整数值会覆盖fixedDelay,封装为FixedDelayTask
        long fixedDelay = scheduled.fixedDelay();
        if (fixedDelay >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
        }
        String fixedDelayString = scheduled.fixedDelayString();
        if (StringUtils.hasText(fixedDelayString)) {
            if (this.embeddedValueResolver != null) {
                fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
            }
            if (StringUtils.hasLength(fixedDelayString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    fixedDelay = parseDelayAsLong(fixedDelayString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                }
                // 此方法虽然表面上是调度FixedDelayTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
                // 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
        }
        // 解析fixedRate和fixedRateString,如果同时配置,fixedRateString最终解析出来的整数值会覆盖fixedRate,封装为FixedRateTask
        long fixedRate = scheduled.fixedRate();
        if (fixedRate >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
        }
        String fixedRateString = scheduled.fixedRateString();
        if (StringUtils.hasText(fixedRateString)) {
            if (this.embeddedValueResolver != null) {
                fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
            }
            if (StringUtils.hasLength(fixedRateString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    fixedRate = parseDelayAsLong(fixedRateString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                }
                 // 此方法虽然表面上是调度FixedRateTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
                // 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
        }
        // Check whether we had any attribute set
        Assert.isTrue(processedSchedule, errorMessage);
        // Finally register the scheduled tasks
        synchronized (this.scheduledTasks) {
            // 注册所有任务实例,这个映射Key为宿主Bean实例,Value为List<ScheduledTask>,后面用于调度所有注册完成的任务
            Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
            regTasks.addAll(tasks);
        }
    }
    catch (IllegalArgumentException ex) {
        throw new IllegalStateException(
                "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
    }
}
复制代码


总的来说,这个方法做了四件事:


  • 解析@Scheduled中的initialDelayinitialDelayString属性,适用于FixedDelayTask或者FixedRateTask的延迟执行。
  • 优先解析@Scheduled中的cron属性,封装为CronTask,通过ScheduledTaskRegistrar进行缓存。
  • 解析@Scheduled中的fixedDelayfixedDelayString属性,封装为FixedDelayTask,通过ScheduledTaskRegistrar进行缓存。
  • 解析@Scheduled中的fixedRatefixedRateString属性,封装为FixedRateTask,通过ScheduledTaskRegistrar进行缓存。


@Scheduled修饰的某个方法如果同时配置了cronfixedDelay|fixedDelayStringfixedRate|fixedRateString属性,意味着此方法同时封装为三种任务CronTaskFixedDelayTaskFixedRateTask。解析xxString值的使用,用到了EmbeddedValueResolver解析字符串的值,支持占位符,这样可以直接获取环境配置中的占位符属性(基于SPEL的特性,甚至可以支持嵌套占位符)。解析成功的所有任务实例存放在ScheduledAnnotationBeanPostProcessor的一个映射scheduledTasks中:


// 宿主Bean实例 -> 解析完成的任务实例Set
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
复制代码


解析和缓存工作完成之后,接着分析最终激活所有调度任务的逻辑,见互斥方法afterSingletonsInstantiated()onApplicationEvent(),两者中一定只有一个方法能够调用finishRegistration()


// 所有单例实例化完毕之后回调
public void afterSingletonsInstantiated() {
    // Remove resolved singleton classes from cache
    this.nonAnnotatedClasses.clear();
    if (this.applicationContext == null) {
        // Not running in an ApplicationContext -> register tasks early...
        finishRegistration();
    }
}
// 上下文刷新完成之后回调
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (event.getApplicationContext() == this.applicationContext) {
        // Running in an ApplicationContext -> register tasks this late...
        // giving other ContextRefreshedEvent listeners a chance to perform
        // their work at the same time (e.g. Spring Batch's job registration).
        finishRegistration();
    }
}
// 
private void finishRegistration() {
    // 如果持有的scheduler对象不为null则设置ScheduledTaskRegistrar中的任务调度器
    if (this.scheduler != null) {
        this.registrar.setScheduler(this.scheduler);
    }
    // 这个判断一般会成立,得到的BeanFactory就是DefaultListableBeanFactory
    if (this.beanFactory instanceof ListableBeanFactory) {
        // 获取所有的调度配置器SchedulingConfigurer实例,并且都回调configureTasks()方法,这个很重要,它是用户动态装载调取任务的扩展钩子接口
        Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
        List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
        // SchedulingConfigurer实例列表排序
        AnnotationAwareOrderComparator.sort(configurers);
        for (SchedulingConfigurer configurer : configurers) {
            configurer.configureTasks(this.registrar);
        }
    }
    // 下面这一大段逻辑都是为了从BeanFactory取出任务调度器实例,主要判断TaskScheduler或者ScheduledExecutorService类型的Bean,包括尝试通过类型或者名字获取
    // 获取成功后设置到ScheduledTaskRegistrar中
    if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
        try {
            // Search for TaskScheduler bean...
            this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
        }
        catch (NoUniqueBeanDefinitionException ex) {
            logger.trace("Could not find unique TaskScheduler bean", ex);
            try {
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isInfoEnabled()) {
                    logger.info("More than one TaskScheduler bean exists within the context, and " +
                            "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                            "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                            "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                            ex.getBeanNamesFound());
                }
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
            logger.trace("Could not find default TaskScheduler bean", ex);
            // Search for ScheduledExecutorService bean next...
            try {
                this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex2) {
                logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
                try {
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                }
                catch (NoSuchBeanDefinitionException ex3) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex2.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex2) {
                logger.trace("Could not find default ScheduledExecutorService bean", ex2);
                // Giving up -> falling back to default scheduler within the registrar...
                logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
            }
        }
    }
    // 调用ScheduledTaskRegistrar的afterPropertiesSet()方法,装载所有的调度任务
    this.registrar.afterPropertiesSet();
}
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {
    // 省略其他代码.........
    @Override
    public void afterPropertiesSet() {
        scheduleTasks();
    }
    // 装载所有调度任务
    @SuppressWarnings("deprecation")
    protected void scheduleTasks() {
        // 这里注意一点,如果找不到任务调度器实例,那么会用单个线程调度所有任务
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        // 调度所有装载完毕的自定义触发器的任务实例
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        // 调度所有装载完毕的CronTask
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                addScheduledTask(scheduleCronTask(task));
            }
        }
        // 调度所有装载完毕的FixedRateTask
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        // 调度所有装载完毕的FixedDelayTask
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }   
    // 省略其他代码......... 
}
复制代码


注意两个个问题:


  • 如果没有配置TaskScheduler或者ScheduledExecutorService类型的Bean,那么调度模块只会创建一个线程去调度所有装载完毕的任务,如果任务比较多,执行密度比较大,很有可能会造成大量任务饥饿,表现为存在部分任务不会触发调度的场景(这个是调度模块生产中经常遇到的故障,需要重点排查是否没有设置TaskScheduler或者ScheduledExecutorService)。
  • SchedulingConfigurer是调度模块提供给使用的进行扩展的钩子接口,用于在激活所有调度任务之前回调ScheduledTaskRegistrar实例,只要拿到ScheduledTaskRegistrar实例,我们就可以使用它注册和装载新的Task


相关文章
|
1月前
|
XML Java 开发者
Spring Boot中的bean注入方式和原理
Spring Boot中的bean注入方式和原理
61 0
|
1月前
|
druid Java 数据库
Spring Boot的定时任务与异步任务
Spring Boot的定时任务与异步任务
|
1月前
|
传感器 Java API
Spring揭秘:Aware接口应用场景及实现原理!
Aware接口赋予了Bean更多自感知的能力,通过实现不同的Aware接口,Bean可以轻松地获取到Spring容器中的其他资源引用,像ApplicationContext、BeanFactory等。 这样不仅增强了Bean的功能,还提高了代码的可维护性和扩展性,从而让Spring的IoC容器变得更加强大和灵活。
127 0
Spring揭秘:Aware接口应用场景及实现原理!
|
28天前
|
安全 Java 数据安全/隐私保护
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
66 1
|
18天前
|
安全 数据安全/隐私保护
Springboot+Spring security +jwt认证+动态授权
Springboot+Spring security +jwt认证+动态授权
|
28天前
|
存储 XML 缓存
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南(一)
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南
66 0
|
1天前
|
XML 人工智能 Java
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
|
2天前
|
安全 Java API
Spring工厂API与原理
Spring工厂API与原理
23 10
|
8天前
|
Java 关系型数据库 MySQL
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。一套UWB精确定位系统,最高定位精度可达10cm,具有高精度,高动态,高容量,低功耗的应用。
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
|
29天前
|
XML 缓存 Java
天天用 Spring,bean 实例化原理你懂吗
天天用 Spring,bean 实例化原理你懂吗
17 0