Spring 定时任务详解

简介: 《读尽源码》

EnableScheduling

  • 首先关注的类为启动定时任务的注解@EnableScheduling
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}Copy to clipboardErrorCopied

SchedulingConfiguration

  • 注册定时任务相关信息
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
    /**
     * 开启定时任务
     * @return
     */
    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        // 注册 ScheduledAnnotationBeanPostProcessor
        return new ScheduledAnnotationBeanPostProcessor();
    }
}Copy to clipboardErrorCopied

ScheduledAnnotationBeanPostProcessor

  • 关注 application 事件,以及 spring 生命周期相关的接口实现
/**
     * application 事件
     * @param event the event to respond to
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext() == this.applicationContext) {
            // Running in an ApplicationContext -> register tasks this late...
            // giving other ContextRefreshedEvent listeners a chance to perform
            // their work at the same time (e.g. Spring Batch's job registration).
            // 注册定时任务
            finishRegistration();
        }
    }Copy to clipboardErrorCopied
@Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
                bean instanceof ScheduledExecutorService) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }
        // 当前类
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            // 方法扫描,存在 Scheduled、Schedules  注解的全部扫描
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                        Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, Scheduled.class, Schedules.class);
                        return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
                }
            }
            else {
                // Non-empty set of methods
                annotatedMethods.forEach((method, scheduledMethods) ->
                        // 处理 scheduled 相关信息
                        scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                if (logger.isTraceEnabled()) {
                    logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }Copy to clipboardErrorCopied
  • 处理定时任务注解
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            Runnable runnable = createRunnable(bean, method);
            boolean processedSchedule = false;
            String errorMessage =
                    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
            Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
            // Determine initial delay
            // 是否延迟执行
            long initialDelay = scheduled.initialDelay();
            // 延迟执行时间
            String initialDelayString = scheduled.initialDelayString();
            // 是否有延迟执行的时间
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                if (StringUtils.hasLength(initialDelayString)) {
                    try {
                        initialDelay = parseDelayAsLong(initialDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                    }
                }
            }
            // Check cron expression
            // 获取cron表达式
            String cron = scheduled.cron();
            // cron表达式是否存在
            if (StringUtils.hasText(cron)) {
                // 获取时区
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    // 字符串转换
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    // cron 是否延迟
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    if (!Scheduled.CRON_DISABLED.equals(cron)) {
                        TimeZone timeZone;
                        if (StringUtils.hasText(zone)) {
                            // 时区解析
                            timeZone = StringUtils.parseTimeZoneString(zone);
                        }
                        else {
                            // 默认时区获取
                            timeZone = TimeZone.getDefault();
                        }
                        // 创建任务
                        tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                    }
                }
            }
            // At this point we don't need to differentiate between initial delay set or not anymore
            if (initialDelay < 0) {
                initialDelay = 0;
            }
            // Check fixed delay
            // 获取间隔调用时间
            long fixedDelay = scheduled.fixedDelay();
            // 间隔时间>0
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                // 创建任务,间隔时间定时任务
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
            // 延迟时间
            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = parseDelayAsLong(fixedDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                    }
                    // 创建延迟时间任务
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
            }
            // Check fixed rate
            // 获取调用频率
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                // 创建调用频率的定时任务
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = parseDelayAsLong(fixedRateString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
            }
            // Check whether we had any attribute set
            Assert.isTrue(processedSchedule, errorMessage);
            // Finally register the scheduled tasks
            synchronized (this.scheduledTasks) {
                // 定时任务注册
                Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
                regTasks.addAll(tasks);
            }
        }
        catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }Copy to clipboardErrorCopied

定时任务

  • CronTask
  • cron 定时任务
  • FixedDelayTask
  • 间隔时间的定时任务
  • FixedRateTask
  • 调用频率的定时任务
  • ScheduledTask
  • 定时任务对象

cron 表达式解析

  • org.springframework.scheduling.support.CronSequenceGenerator.doParse
private void doParse(String[] fields) {
        setNumberHits(this.seconds, fields[0], 0, 60);
        setNumberHits(this.minutes, fields[1], 0, 60);
        setNumberHits(this.hours, fields[2], 0, 24);
        setDaysOfMonth(this.daysOfMonth, fields[3]);
        setMonths(this.months, fields[4]);
        setDays(this.daysOfWeek, replaceOrdinals(fields[5], "SUN,MON,TUE,WED,THU,FRI,SAT"), 8);
        if (this.daysOfWeek.get(7)) {
            // Sunday can be represented as 0 or 7
            this.daysOfWeek.set(0);
            this.daysOfWeek.clear(7);
        }
    }
Copy to clipboardErrorCopied

执行定时任务

  • 这里以 CronTask 任务进行分析,其他定时任务同理
  • org.springframework.scheduling.config.ScheduledTaskRegistrar.scheduleCronTask
@Nullable
    public ScheduledTask scheduleCronTask(CronTask task) {
        // 从未执行的任务列表中删除,并且获取这个任务
        ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
        boolean newTask = false;
        // 没有这个任务
        if (scheduledTask == null) {
            scheduledTask = new ScheduledTask(task);
            newTask = true;
        }
        // 任务调度器是否为空
        if (this.taskScheduler != null) {
            scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
        }
        else {
            // 添加到cron任务列表
            addCronTask(task);
            // 保存到没有执行的任务中
            this.unresolvedTasks.put(task, scheduledTask);
        }
        return (newTask ? scheduledTask : null);
    }
相关文章
|
7月前
|
druid Java 数据库
Spring Boot的定时任务与异步任务
Spring Boot的定时任务与异步任务
|
7月前
|
Java 数据库 Spring
Spring Boot 实现定时任务的动态增删启停
Spring Boot 实现定时任务的动态增删启停
102 0
|
4月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
143 1
|
2月前
|
Java BI 调度
Java Spring的定时任务的配置和使用
遵循上述步骤,你就可以在Spring应用中轻松地配置和使用定时任务,满足各种定时处理需求。
158 1
|
2月前
|
存储 Java API
简单两步,Spring Boot 写死的定时任务也能动态设置:技术干货分享
【10月更文挑战第4天】在Spring Boot开发中,定时任务通常通过@Scheduled注解来实现,这种方式简单直接,但存在一个显著的限制:任务的执行时间或频率在编译时就已经确定,无法在运行时动态调整。然而,在实际工作中,我们往往需要根据业务需求或外部条件的变化来动态调整定时任务的执行计划。本文将分享一个简单两步的解决方案,让你的Spring Boot应用中的定时任务也能动态设置,从而满足更灵活的业务需求。
154 4
|
5月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
15050 32
|
7月前
|
NoSQL Java Redis
Spring Boot 监听 Redis Key 失效事件实现定时任务
Spring Boot 监听 Redis Key 失效事件实现定时任务
145 0
|
4月前
|
Java 开发者 Spring
Spring Boot实战宝典:揭秘定时任务的幕后英雄,让业务处理如流水般顺畅,轻松驾驭时间管理艺术!
【8月更文挑战第29天】在现代应用开发中,定时任务如数据备份、报告生成等至关重要。Spring Boot作为流行的Java框架,凭借其强大的集成能力和简洁的配置方式,为开发者提供了高效的定时任务解决方案。本文详细介绍了如何在Spring Boot项目中启用定时任务支持、编写定时任务方法,并通过实战案例展示了其在业务场景中的应用,同时提供了注意事项以确保任务的正确执行。
57 0
|
4月前
|
Dubbo Java 调度
揭秘!Spring Cloud Alibaba的超级力量——如何轻松驾驭分布式定时任务调度?
【8月更文挑战第20天】在现代微服务架构中,Spring Cloud Alibaba通过集成分布式定时任务调度功能解决了一致性和可靠性挑战。它利用TimerX实现任务的分布式编排与调度,并通过`@SchedulerLock`确保任务不被重复执行。示例代码展示了如何配置定时任务及其分布式锁,以实现每5秒仅由一个节点执行任务,适合构建高可用的微服务系统。
72 0
|
5月前
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。