前提
最近的新项目和数据同步相关,有定时调度的需求。之前一直有使用过Quartz
、XXL-Job
、Easy Scheduler
等调度框架,后来越发觉得这些框架太重量级了,于是想到了Spring
内置的Scheduling
模块。而原生的Scheduling
模块只是内存态的调度模块,不支持任务的持久化或者配置(配置任务通过@Scheduled
注解进行硬编码,不能抽离到类之外),因此考虑理解Scheduling
模块的底层原理,并且基于此造一个简单的轮子,使之支持调度任务配置:通过配置文件或者JDBC
数据源。
Scheduling模块
Scheduling
模块是spring-context
依赖下的一个包org.springframework.scheduling
:
这个模块的类并不多,有四个子包:
- 顶层包的定义了一些通用接口和异常。
org.springframework.scheduling.annotation
:定义了调度、异步任务相关的注解和解析类,常用的注解如@Async
、@EnableAsync
、@EnableScheduling
和@Scheduled
。org.springframework.scheduling.concurrent
:定义了调度任务执行器和相对应的FactoryBean
。org.springframework.scheduling.config
:定义了配置解析、任务具体实现类、调度任务XML
配置文件解析相关的解析类。org.springframework.scheduling.support
:定义了反射支持类、Cron
表达式解析器等工具类。
如果想单独使用Scheduling
,只需要引入spring-context
这个依赖。但是现在流行使用SpringBoot
,引入spring-boot-starter-web
已经集成了spring-context
,可以直接使用Scheduling
模块,笔者编写本文的时候(2020-03-14
)SpringBoot
的最新版本为2.2.5.RELEASE
,可以选用此版本进行源码分析或者生产应用:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.boot.version>2.2.5.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> 复制代码
开启Scheduling
模块支持只需要在某一个配置类中添加@EnableScheduling
注解即可,一般为了明确模块的引入,建议在启动类中使用此注解,如:
@EnableScheduling @SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class, args); } } 复制代码
Scheduling模块的工作流程
这个图描述了Scheduling
模块的工作流程,这里分析一下非XML
配置下的流程(右边的分支):
- 通过注解
@EnableScheduling
中的@Import
引入了SchedulingConfiguration
,而SchedulingConfiguration
中配置了一个类型为ScheduledAnnotationBeanPostProcessor
名称为org.springframework.context.annotation.internalScheduledAnnotationProcessor
的Bean
,这里有个常见的技巧,Spring
内部加载的Bean
一般会定义名称为internalXXX
,Bean
的role
会定义为ROLE_INFRASTRUCTURE = 2
。 Bean
后置处理器ScheduledAnnotationBeanPostProcessor
会解析和处理每一个符合特定类型的Bean
中的@Scheduled
注解(注意@Scheduled
只能使用在方法或者注解上),并且把解析完成的方法封装为不同类型的Task
实例,缓存在ScheduledTaskRegistrar
中的。ScheduledAnnotationBeanPostProcessor
中的钩子接口方法afterSingletonsInstantiated()
在所有单例初始化完成之后回调触发,在此方法中设置了ScheduledTaskRegistrar
中的任务调度器(TaskScheduler
或者ScheduledExecutorService
类型)实例,并且调用ScheduledTaskRegistrar#afterPropertiesSet()
方法添加所有缓存的Task
实例到任务调度器中执行。
任务调度器
Scheduling
模块支持TaskScheduler
或者ScheduledExecutorService
类型的任务调度器,而ScheduledExecutorService
其实是JDK
并发包java.util.concurrent
的接口,一般实现类就是调度线程池ScheduledThreadPoolExecutor
。实际上,ScheduledExecutorService
类型的实例最终会通过适配器模式转变为ConcurrentTaskScheduler
,所以这里只需要分析TaskScheduler
类型的执行器。
ThreadPoolTaskScheduler
:基于线程池实现的任务执行器,这个是最常用的实现,底层依赖于ScheduledThreadPoolExecutor
实现。ConcurrentTaskScheduler
:TaskScheduler
接口和ScheduledExecutorService
接口的适配器,如果自定义一个ScheduledThreadPoolExecutor
类型的Bean
,那么任务执行器就会适配为ConcurrentTaskScheduler
。DefaultManagedTaskScheduler
:JDK7
引入的JSR-236
的支持,可以通过JNDI
配置此调度执行器,一般很少用到,底层也是依赖于ScheduledThreadPoolExecutor
实现。
也就是说,内置的三个调度器类型底层都依赖于JUC
调度线程池ScheduledThreadPoolExecutor
。这里分析一下顶层接口org.springframework.scheduling.TaskScheduler
提供的功能(笔者已经把功能一致的default
方法暂时移除):
// 省略一些功能一致的default方法 public interface TaskScheduler { // 调度一个任务,通过触发器实例指定触发时间周期 ScheduledFuture<?> schedule(Runnable task, Trigger trigger); // 指定起始时间调度一个任务 - 单次执行 ScheduledFuture<?> schedule(Runnable task, Date startTime); // 指定固定频率调度一个任务,period的单位是毫秒 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period); // 指定起始时间和固定频率调度一个任务,period的单位是毫秒 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period); // 指定固定延迟间隔调度一个任务,delay的单位是毫秒 ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay); // 指定起始时间和固定延迟间隔调度一个任务,delay的单位是毫秒 ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay); } 复制代码
Task的分类
Scheduling
模块中支持不同类型的任务,主要包括下面的3种(解析的优先顺序也是如下):
Cron
表达式任务,支持通过Cron
表达式配置执行的周期,对应的任务类型为org.springframework.scheduling.config.CronTask
。- 固定延迟间隔任务,也就是上一轮执行完毕后间隔固定周期再执行本轮,依次类推,对应的的任务类型为
org.springframework.scheduling.config.FixedDelayTask
。 - 固定频率任务,基于固定的间隔时间执行,不会理会上一轮是否执行完毕本轮会照样执行,对应的的任务类型为
org.springframework.scheduling.config.FixedRateTask
。
关于这几类Task
,举几个简单的例子。CronTask
是通过cron
表达式指定执行周期的,并且不支持延迟执行,可以使用特殊字符-
禁用任务执行:
// 注解声明式使用 - 每五秒执行一次,不支持initialDelay @Scheduled(cron = "*/5 * * * * ?") public void processTask(){ } // 注解声明式使用 - 禁止任务执行 @Scheduled(cron = "-") public void processTask(){ } // 编程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); CronTask cronTask = new CronTask(() -> { System.out.println(String.format("[%s] - CronTask触发...", F.format(LocalDateTime.now()))); }, "*/5 * * * * ?"); taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger()); Thread.sleep(Integer.MAX_VALUE); } } // 某次执行输出结果 [2020-03-16 01:07:00] - CronTask触发... [2020-03-16 01:07:05] - CronTask触发... ...... 复制代码
FixedDelayTask
需要配置延迟间隔值(fixedDelay
或者fixedDelayString
)和可选的起始延迟执行时间(initialDelay
或者initialDelayString
),这里注意一点是fixedDelayString
和initialDelayString
都支持从EmbeddedValueResolver
(简单理解为配置文件的属性处理器)读取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
// 注解声明式使用 - 延迟一秒开始执行,延迟间隔为5秒 @Scheduled(fixedDelay = 5000, initialDelay = 1000) public void process(){ } // 注解声明式使用 - spring-boot配置文件中process.task.fixedDelay=5000 process.task.initialDelay=1000 @Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}") public void process(){ } // 编程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> { System.out.println(String.format("[%s] - FixedDelayTask触发...", F.format(LocalDateTime.now()))); }, 5000, 1000); Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay()); taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } // 某次执行输出结果 [2020-03-16 01:06:12] - FixedDelayTask触发... [2020-03-16 01:06:17] - FixedDelayTask触发... ...... 复制代码
FixedRateTask
需要配置固定间隔值(fixedRate
或者fixedRateString
)和可选的起始延迟执行时间(initialDelay
或者initialDelayString
),这里注意一点是fixedRateString
和initialDelayString
都支持从EmbeddedValueResolver
(简单理解为配置文件的属性处理器)读取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
// 注解声明式使用 - 延迟一秒开始执行,每隔5秒执行一次 @Scheduled(fixedRate = 5000, initialDelay = 1000) public void processTask(){ } // 注解声明式使用 - spring-boot配置文件中process.task.fixedRate=5000 process.task.initialDelay=1000 @Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}") public void process(){ } // 编程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); FixedRateTask fixedRateTask = new FixedRateTask(() -> { System.out.println(String.format("[%s] - FixedRateTask触发...", F.format(LocalDateTime.now()))); }, 5000, 1000); Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay()); taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } // 某次执行输出结果 [2020-03-16 23:58:25] - FixedRateTask触发... [2020-03-16 23:58:30] - FixedRateTask触发... ...... 复制代码
简单分析核心流程的源代码
在SpringBoot
注解体系下,Scheduling
模块的所有逻辑基本在ScheduledAnnotationBeanPostProcessor
和ScheduledTaskRegistrar
中。一般来说,一个类实现的接口代表了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor
实现的接口:
ScheduledTaskHolder
接口:返回Set<ScheduledTask>
,表示持有的所有任务实例。MergedBeanDefinitionPostProcessor
接口:Bean
定义合并时回调,预留空实现,暂时不做任何处理。BeanPostProcessor
接口:也就是MergedBeanDefinitionPostProcessor
的父接口,Bean
实例初始化前后分别回调,其中,后回调的postProcessAfterInitialization()
方法就是用于解析@Scheduled
和装载ScheduledTask
,需要重点关注此方法的逻辑。DestructionAwareBeanPostProcessor
接口:具体的Bean
实例销毁的时候回调,用于Bean
实例销毁的时候移除和取消对应的任务实例。Ordered
接口:用于Bean
加载时候的排序,主要是改变ScheduledAnnotationBeanPostProcessor
在BeanPostProcessor
执行链中的顺序。EmbeddedValueResolverAware
接口:回调StringValueResolver
实例,用于解析带占位符的环境变量属性值。BeanNameAware
接口:回调BeanName
。BeanFactoryAware
接口:回调BeanFactory
实例,具体是DefaultListableBeanFactory
,也就是熟知的IOC
容器。ApplicationContextAware
接口:回调ApplicationContext
实例,也就是熟知的Spring
上下文,它是IOC
容器的门面,同时是事件广播器、资源加载器的实现等等。SmartInitializingSingleton
接口:所有单例实例化完毕之后回调,作用是在持有的applicationContext
为NULL
的时候开始调度所有加载完成的任务,这个钩子接口十分有用,笔者常用它做一些资源初始化工作。ApplicationListener
接口:监听Spring
应用的事件,具体是ApplicationListener<ContextRefreshedEvent>
,监听上下文刷新的事件,如果事件中携带的ApplicationContext
实例和ApplicationContextAware
回调的ApplicationContext
实例一致,那么在此监听回调方法中开始调度所有加载完成的任务,也就是在ScheduledAnnotationBeanPostProcessor
这个类中,SmartInitializingSingleton
接口的实现和ApplicationListener
接口的实现逻辑是互斥的。DisposableBean
接口:当前Bean
实例销毁时候回调,也就是ScheduledAnnotationBeanPostProcessor
自身被销毁的时候回调,用于取消和清理所有的ScheduledTask
。
上面分析的钩子接口在SpringBoot体系中可以按需使用,了解回调不同钩子接口的回调时机,可以在特定时机完成达到理想的效果。
@Scheduled
注解的解析集中在postProcessAfterInitialization()
方法:
public Object postProcessAfterInitialization(Object bean, String beanName) { // 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三种类型的Bean if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } // 获取Bean的用户态类型,例如Bean有可能被CGLIB增强,这个时候要取其父类 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); // nonAnnotatedClasses存放着不存在@Scheduled注解的类型,缓存起来避免重复判断它是否携带@Scheduled注解的方法 if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { // 因为JDK8之后支持重复注解,因此获取具体类型中Method -> @Scheduled的集合,也就是有可能一个方法使用多个@Scheduled注解,最终会封装为多个Task Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); // 解析到类型中不存在@Scheduled注解的方法添加到nonAnnotatedClasses缓存 if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Method -> @Scheduled的集合遍历processScheduled()方法进行登记 annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; } 复制代码
processScheduled(Scheduled scheduled, Method method, Object bean)
就是具体的注解解析和Task
封装的方法:
// Runnable适配器 - 用于反射调用具体的方法,触发任务方法执行 public class ScheduledMethodRunnable implements Runnable { private final Object target; private final Method method; public ScheduledMethodRunnable(Object target, Method method) { this.target = target; this.method = method; } ....// 省略无关代码 // 这个就是最终的任务方法执行的核心方法,抑制修饰符,然后反射调用 @Override public void run() { try { ReflectionUtils.makeAccessible(this.method); this.method.invoke(this.target); } catch (InvocationTargetException ex) { ReflectionUtils.rethrowRuntimeException(ex.getTargetException()); } catch (IllegalAccessException ex) { throw new UndeclaredThrowableException(ex); } } } // 通过方法所在Bean实例和方法封装Runnable适配器ScheduledMethodRunnable实例 protected Runnable createRunnable(Object target, Method method) { Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } // 这个方法十分长,不过逻辑并不复杂,它只做了四件事 // 0. 解析@Scheduled中的initialDelay、initialDelayString属性,适用于FixedDelayTask或者FixedRateTask的延迟执行 // 1. 优先解析@Scheduled中的cron属性,封装为CronTask,通过ScheduledTaskRegistrar进行缓存 // 2. 解析@Scheduled中的fixedDelay、fixedDelayString属性,封装为FixedDelayTask,通过ScheduledTaskRegistrar进行缓存 // 3. 解析@Scheduled中的fixedRate、fixedRateString属性,封装为FixedRateTask,通过ScheduledTaskRegistrar进行缓存 protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { // 通过方法宿主Bean和目标方法封装Runnable适配器ScheduledMethodRunnable实例 Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; // 缓存已经装载的任务 Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay // 解析初始化延迟执行时间,initialDelayString支持占位符配置,如果initialDelayString配置了,会覆盖initialDelay的值 long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression // 解析时区zone的值,支持支持占位符配置,判断cron是否存在,存在则装载为CronTask String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 此方法虽然表面上是调度CronTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中 // 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore // 修正小于0的初始化延迟执行时间值为0 if (initialDelay < 0) { initialDelay = 0; } // 解析fixedDelay和fixedDelayString,如果同时配置,fixedDelayString最终解析出来的整数值会覆盖fixedDelay,封装为FixedDelayTask long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } // 此方法虽然表面上是调度FixedDelayTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中 // 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // 解析fixedRate和fixedRateString,如果同时配置,fixedRateString最终解析出来的整数值会覆盖fixedRate,封装为FixedRateTask long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } // 此方法虽然表面上是调度FixedRateTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中 // 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { // 注册所有任务实例,这个映射Key为宿主Bean实例,Value为List<ScheduledTask>,后面用于调度所有注册完成的任务 Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } } 复制代码
总的来说,这个方法做了四件事:
- 解析
@Scheduled
中的initialDelay
、initialDelayString
属性,适用于FixedDelayTask
或者FixedRateTask
的延迟执行。 - 优先解析
@Scheduled
中的cron
属性,封装为CronTask
,通过ScheduledTaskRegistrar
进行缓存。 - 解析
@Scheduled
中的fixedDelay
、fixedDelayString
属性,封装为FixedDelayTask
,通过ScheduledTaskRegistrar
进行缓存。 - 解析
@Scheduled
中的fixedRate
、fixedRateString
属性,封装为FixedRateTask
,通过ScheduledTaskRegistrar
进行缓存。
@Scheduled
修饰的某个方法如果同时配置了cron
、fixedDelay|fixedDelayString
和fixedRate|fixedRateString
属性,意味着此方法同时封装为三种任务CronTask
、FixedDelayTask
和FixedRateTask
。解析xxString
值的使用,用到了EmbeddedValueResolver
解析字符串的值,支持占位符,这样可以直接获取环境配置中的占位符属性(基于SPEL
的特性,甚至可以支持嵌套占位符)。解析成功的所有任务实例存放在ScheduledAnnotationBeanPostProcessor
的一个映射scheduledTasks
中:
// 宿主Bean实例 -> 解析完成的任务实例Set private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16); 复制代码
解析和缓存工作完成之后,接着分析最终激活所有调度任务的逻辑,见互斥方法afterSingletonsInstantiated()
和onApplicationEvent()
,两者中一定只有一个方法能够调用finishRegistration()
:
// 所有单例实例化完毕之后回调 public void afterSingletonsInstantiated() { // Remove resolved singleton classes from cache this.nonAnnotatedClasses.clear(); if (this.applicationContext == null) { // Not running in an ApplicationContext -> register tasks early... finishRegistration(); } } // 上下文刷新完成之后回调 @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext() == this.applicationContext) { // Running in an ApplicationContext -> register tasks this late... // giving other ContextRefreshedEvent listeners a chance to perform // their work at the same time (e.g. Spring Batch's job registration). finishRegistration(); } } // private void finishRegistration() { // 如果持有的scheduler对象不为null则设置ScheduledTaskRegistrar中的任务调度器 if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } // 这个判断一般会成立,得到的BeanFactory就是DefaultListableBeanFactory if (this.beanFactory instanceof ListableBeanFactory) { // 获取所有的调度配置器SchedulingConfigurer实例,并且都回调configureTasks()方法,这个很重要,它是用户动态装载调取任务的扩展钩子接口 Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); // SchedulingConfigurer实例列表排序 AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } // 下面这一大段逻辑都是为了从BeanFactory取出任务调度器实例,主要判断TaskScheduler或者ScheduledExecutorService类型的Bean,包括尝试通过类型或者名字获取 // 获取成功后设置到ScheduledTaskRegistrar中 if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { // Search for TaskScheduler bean... this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) { logger.trace("Could not find unique TaskScheduler bean", ex); try { this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.trace("Could not find default TaskScheduler bean", ex); // Search for ScheduledExecutorService bean next... try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException ex2) { logger.trace("Could not find unique ScheduledExecutorService bean", ex2); try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException ex3) { if (logger.isInfoEnabled()) { logger.info("More than one ScheduledExecutorService bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex2.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex2) { logger.trace("Could not find default ScheduledExecutorService bean", ex2); // Giving up -> falling back to default scheduler within the registrar... logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } // 调用ScheduledTaskRegistrar的afterPropertiesSet()方法,装载所有的调度任务 this.registrar.afterPropertiesSet(); } public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean { // 省略其他代码......... @Override public void afterPropertiesSet() { scheduleTasks(); } // 装载所有调度任务 @SuppressWarnings("deprecation") protected void scheduleTasks() { // 这里注意一点,如果找不到任务调度器实例,那么会用单个线程调度所有任务 if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } // 调度所有装载完毕的自定义触发器的任务实例 if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } // 调度所有装载完毕的CronTask if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } // 调度所有装载完毕的FixedRateTask if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } // 调度所有装载完毕的FixedDelayTask if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } } // 省略其他代码......... } 复制代码
注意两个个问题:
- 如果没有配置
TaskScheduler
或者ScheduledExecutorService
类型的Bean
,那么调度模块只会创建一个线程去调度所有装载完毕的任务,如果任务比较多,执行密度比较大,很有可能会造成大量任务饥饿,表现为存在部分任务不会触发调度的场景(这个是调度模块生产中经常遇到的故障,需要重点排查是否没有设置TaskScheduler
或者ScheduledExecutorService
)。 SchedulingConfigurer
是调度模块提供给使用的进行扩展的钩子接口,用于在激活所有调度任务之前回调ScheduledTaskRegistrar
实例,只要拿到ScheduledTaskRegistrar
实例,我们就可以使用它注册和装载新的Task
。