前言
JDK给我们提供了定时任务的能力,详解之前有篇博文:
【小家java】Java定时任务ScheduledThreadPoolExecutor详解以及与Timer、TimerTask的区别(执行指定次数停止任务)
而Spring基于此做了更便捷的封装,使得我们使用起来异常的方便~
定时任务也是平时开发不可缺少的一个使用场景,本文主要看看Spring是怎么来实现这一套逻辑的?
Demo
@Service public class HelloServiceImpl implements HelloService { // @Schedules // 它是允许重复注解的~~~~ //@Scheduled(cron = "0/5 * * * * ?") @Scheduled(cron = "0/2 * * * * ?") // 每2秒钟执行一次 public void job1() { System.out.println("我执行了~~" + LocalTime.now()); } }
然后让Spring开启定时任务的支持即可。
@Configuration @EnableScheduling // 开启定时任务 public class RootConfig { }
输出如下:每两秒钟执行一次
我执行了~~14:46:36.003 我执行了~~14:46:38.002 我执行了~~14:46:40.001 我执行了~~14:46:42.001 我执行了~~14:46:44.002 ...
原理、源码分析
很显然又是@EnableXXX
的设计模式,因此入口又从该注解开始吧~~~
@EnableScheduling
//@since 3.1 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling { }
可以看出它没有任何属性。所以只有一个@Import
在起作用,因此重点看看SchedulingConfiguration
它的效果同XML中的
<task:annotation-driven>
SchedulingConfiguration
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
同样的,也非常非常的简单。只是向容器注入了一个ScheduledAnnotationBeanPostProcessor,它是一个PostProcessor,同时也是一个SmartInitializingSingleton等等
ScheduledAnnotationBeanPostProcessor
Scheduled注解后处理器,项目启动时会扫描所有标记了@Scheduled注解的方法,封装成ScheduledTask注册起来。
这个处理器是处理定时任务的核心类,比较复杂。下面也是结合源码,来看看它具体的一个工作内容:
// 首先:非常震撼的是,它实现的接口非常的多。还好的是,大部分接口我们都很熟悉了。 // MergedBeanDefinitionPostProcessor:它是个BeanPostProcessor // DestructionAwareBeanPostProcessor:在销毁此Bean的时候,会调用对应方法 // SmartInitializingSingleton:它会在所有的单例Bean都完成了初始化后,调用这个接口的方法 // EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware:都是些感知接口 // DisposableBean:该Bean销毁的时候会调用 // ApplicationListener<ContextRefreshedEvent>:监听容器的`ContextRefreshedEvent`事件 // ScheduledTaskHolder:维护本地的ScheduledTask实例 public class ScheduledAnnotationBeanPostProcessor implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor, Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware, SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean { /** * The default name of the {@link TaskScheduler} bean to pick up: "taskScheduler". * <p>Note that the initial lookup happens by type; this is just the fallback * in case of multiple scheduler beans found in the context. * @since 4.2 */ // 看着注释就知道,和@Async的默认处理一样~~~~先类型 在回退到名称 public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler"; // 调度器(若我们没有配置,它是null的) @Nullable private Object scheduler; // 这些都是Awire感知接口注入进来的~~ @Nullable private StringValueResolver embeddedValueResolver; @Nullable private String beanName; @Nullable private BeanFactory beanFactory; @Nullable private ApplicationContext applicationContext; // ScheduledTaskRegistrar:ScheduledTask注册中心,ScheduledTaskHolder接口的一个重要的实现类,维护了程序中所有配置的ScheduledTask // 内部会处理调取器得工作,因此我建议先移步,看看这个类得具体分析 private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar(); // 缓存,没有被标注注解的class们 // 这有个技巧,使用了newSetFromMap,自然而然的这个set也就成了一个线程安全的set private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64)); // 缓存对应的Bean上 里面对应的 ScheduledTask任务。可议有多个哦~~ // 注意:此处使用了IdentityHashMap private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16); // 希望此processor是最后执行的~ @Override public int getOrder() { return LOWEST_PRECEDENCE; } //Set the {@link org.springframework.scheduling.TaskScheduler} that will invoke the scheduled methods // 也可以是JDK的ScheduledExecutorService(内部会给你包装成一个TaskScheduler) // 若没有指定。那就会走默认策略:去从起中先按照类型找`TaskScheduler`该类型(或者ScheduledExecutorService这个类型也成)的。 // 若有多个该类型或者找不到,就安好"taskScheduler"名称去找 // 再找不到,就用系统默认的: public void setScheduler(Object scheduler) { this.scheduler = scheduler; } ... // 此方法会在该容器内所有的单例Bean已经初始化全部结束后,执行 @Override public void afterSingletonsInstantiated() { // Remove resolved singleton classes from cache // 因为已经是最后一步了,所以这个缓存可议清空了 this.nonAnnotatedClasses.clear(); // 在容器内运行,ApplicationContext都不会为null if (this.applicationContext == null) { // Not running in an ApplicationContext -> register tasks early... // 如果不是在ApplicationContext下运行的,那么就应该提前注册这些任务 finishRegistration(); } } // 兼容容器刷新的时间(此时候容器硬启动完成了) 它还在`afterSingletonsInstantiated`的后面执行 @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 这个动作务必要做:因为Spring可能有多个容器,所以可能会发出多个ContextRefreshedEvent 事件 // 显然我们只处理自己容器发出来得事件,别的容器发出来我不管~~ if (event.getApplicationContext() == this.applicationContext) { // Running in an ApplicationContext -> register tasks this late... // giving other ContextRefreshedEvent listeners a chance to perform // their work at the same time (e.g. Spring Batch's job registration). // 为其他ContextRefreshedEvent侦听器提供同时执行其工作的机会(例如,Spring批量工作注册) finishRegistration(); } } private void finishRegistration() { // 如果setScheduler了,就以调用者指定的为准~~~ if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } // 这里继续厉害了:从容器中找到所有的接口`SchedulingConfigurer`的实现类(我们可议通过实现它定制化scheduler) if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); // 同@Async只允许设置一个不一样的是,这里每个都会让它生效 // 但是平时使用,我们自顶一个类足矣~~~ AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } // 至于task是怎么注册进registor的,请带回看`postProcessAfterInitialization`这个方法的实现 // 有任务并且registrar.getScheduler() == null,那就去容器里找来试试~~~ if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { ... // 这块逻辑和@Async的处理一毛一样。忽略了 主要看看resolveSchedulerBean()这个方法即可 } this.registrar.afterPropertiesSet(); } // 从容器中去找一个 private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) { // 若按名字去查找,那就按照名字找 if (byName) { T scheduler = beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType); // 这个处理非常非常有意思,就是说倘若找到了你可以在任意地方直接@Autowired这个Bean了,可以拿这个共用Scheduler来调度我们自己的任务啦~~ if (this.beanName != null && this.beanFactory instanceof ConfigurableBeanFactory) { ((ConfigurableBeanFactory) this.beanFactory).registerDependentBean( DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName); } return scheduler; } // 按照schedulerType该类型的名字匹配resolveNamedBean 底层依赖:getBeanNamesForType else if (beanFactory instanceof AutowireCapableBeanFactory) { NamedBeanHolder<T> holder = ((AutowireCapableBeanFactory) beanFactory).resolveNamedBean(schedulerType); if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory) { ((ConfigurableBeanFactory) beanFactory).registerDependentBean(holder.getBeanName(), this.beanName); } return holder.getBeanInstance(); } // 按照类型找 else { return beanFactory.getBean(schedulerType); } } @Override public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) { } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; } // Bean初始化完成后执行。去看看Bean里面有没有标注了@Scheduled的方法~~ @Override public Object postProcessAfterInitialization(final Object bean, String beanName) { // 拿到目标类型(因为此类有可能已经被代理过) Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); // 这里对没有标注注解的类做了一个缓存,防止从父去扫描(毕竟可能有多个容器,可能有重复扫描的现象) if (!this.nonAnnotatedClasses.contains(targetClass)) { // 如下:主要用到了MethodIntrospector.selectMethods 这个内省方法工具类的这个g工具方法,去找指定Class里面,符合条件的方法们 Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { //过滤Method的核心逻辑就是是否标注有此注解(Merged表示标注在父类、或者接口处也是ok的) Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass()); } } // 此处相当于已经找到了对应的注解方法~~~ else { // Non-empty set of methods // 这里有一个双重遍历。因为一个方法上,可能重复标注多个这样的注解~~~~~ // 所以最终遍历出来后,就交给processScheduled(scheduled, method, bean)去处理了 annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isDebugEnabled()) { logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; } // 这个方法就是灵魂了。就是执行这个注解,最终会把这个任务注册进去,并且启动的~~~ protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { // 标注此注解的方法必须是无参的方法 Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); // 拿到最终要被调用的方法 做这么一步操作主要是防止方法被代理了 Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); // 把该方法包装成一个Runnable 线程~~~ Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; // 装载任务,这里长度定为4,因为Spring认为标注4个注解还不够你用的? Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay // 计算出延时多长时间执行 initialDelayString 支持占位符如:@Scheduled(fixedDelayString = "${time.fixedDelay}") // 这段话得意思是,最终拿到一个initialDelay值~~~~~Long型的 long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression // 解析cron String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); // 由此课件,cron也可以使用占位符。把它配置在配置文件里就成~~~zone也是支持占位符的 if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 这个相当于,如果配置了cron,它就是一个task了,就可以吧任务注册进registrar里面了 // 这里面的处理是。如果已经有调度器taskScheduler了,那就立马准备执行了 tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } ... // 下面就不再说了,就是解析fixed delay、fixed rated、 // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks // 最后吧这些任务都放在全局属性里保存起来~~~~ // getScheduledTasks()方法是会把所有的任务都返回出去的~~~ScheduledTaskHolder接口就一个Set<ScheduledTask> getScheduledTasks();方法嘛 synchronized (this.scheduledTasks) { Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean); if (registeredTasks == null) { registeredTasks = new LinkedHashSet<>(4); this.scheduledTasks.put(bean, registeredTasks); } registeredTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } } private static long parseDelayAsLong(String value) throws RuntimeException { if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) { return Duration.parse(value).toMillis(); } return Long.parseLong(value); } private static boolean isP(char ch) { return (ch == 'P' || ch == 'p'); } //@since 5.0.2 获取到所有的任务。包含本实例的,以及registrar(手动注册)的所有任务 @Override public Set<ScheduledTask> getScheduledTasks() { Set<ScheduledTask> result = new LinkedHashSet<>(); synchronized (this.scheduledTasks) { Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values(); for (Set<ScheduledTask> tasks : allTasks) { result.addAll(tasks); } } result.addAll(this.registrar.getScheduledTasks()); return result; } // Bean销毁之前执行。移除掉所有的任务,并且取消所有的任务 @Override public void postProcessBeforeDestruction(Object bean, String beanName) { Set<ScheduledTask> tasks; synchronized (this.scheduledTasks) { tasks = this.scheduledTasks.remove(bean); } if (tasks != null) { for (ScheduledTask task : tasks) { task.cancel(); } } } ... }