SpringBoot+MySQL实现动态定时任务

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 这是一个基于Spring Boot的动态定时任务Demo,利用spring-context模块实现任务调度功能。服务启动时会扫描数据库中的任务表,将任务添加到调度器中,并通过固定频率运行的ScheduleUpdater任务动态更新任务状态和Cron表达式。核心功能包括任务的新增、删除与Cron调整,支持通过ScheduledFuture对象控制任务执行。项目依赖Spring Boot 2.2.10.RELEASE,使用MySQL存储任务信息,包含任务基类ITask及具体实现(如FooTask),便于用户扩展运维界面以增强灵活性。

介绍

一个极简的基于springboot的动态定时任务demo,spring-context模块有对任务调度进行支持,本demo也是基于这个模块进行开发,功能相对简单,用户可以按需改造,比如添加运维界面,在界面上更加灵活的控制任务执行与更新。

基本原理

  • 服务启动后,扫描数据库中的task表,将定时任务添加到调度器中
  • 起一个固定频率执行的ScheduleUpdater任务去扫描数据库,判断任务如有启停、调整cron的话,就更新调度器中的任务。
依赖的核心API

将任务添加到调度器之后,会返回一个ScheduledFuture对象用于控制任务的取消执行。

java

体验AI代码助手

代码解读

复制代码

//添加任务到调度器
@see org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler#schedule(java.lang.Runnable, org.springframework.scheduling.Trigger)
//取消任务执行
@see java.util.concurrent.Future#cancel
@see java.util.concurrent.ScheduledFuture

具体实现

数据库表

sql

体验AI代码助手

代码解读

复制代码

-- ----------------------------
-- Table structure for t_scheduler_info
-- ----------------------------
DROP TABLE IF EXISTS `t_scheduler_info`;
CREATE TABLE `t_scheduler_info` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `task_name` varchar(32) NOT NULL,
  `task_code` varchar(64) NOT NULL,
  `task_cron` varchar(32) NOT NULL,
  `state` tinyint DEFAULT 1 NOT  NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='任务表';

INSERT INTO `demo`.`t_scheduler_info` (`id`, `task_name`, `task_code`, `task_cron`, `state`) VALUES ('1', 'foo', 'foo', '0/5 * * * * ?', '1');
INSERT INTO `demo`.`t_scheduler_info` (`id`, `task_name`, `task_code`, `task_cron`, `state`) VALUES ('2', 'bar', 'bar', '0/9 * * * * ?', '0');
INSERT INTO `demo`.`t_scheduler_info` (`id`, `task_name`, `task_code`, `task_cron`, `state`) VALUES ('3', 'didi', 'didi', '0/10 * * * * ?', '0');

maven依赖

xml

体验AI代码助手

代码解读

复制代码

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.10.RELEASE</version>
        <relativePath/>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>schedule-demo</artifactId>
    <version>1.0.0</version>
    <name>schedule-demo</name>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.22</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

项目启动类及配置类

java

体验AI代码助手

代码解读

复制代码

@SpringBootApplication
@EnableScheduling //开启定时任务支持
public class ScheduleDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScheduleDemoApplication.class, args);
    }

}

@Configuration
@Slf4j
public class ScheduleConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
    }

    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        taskScheduler.setThreadNamePrefix("dynamic_schedule_");
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setErrorHandler(t -> {
            //TODO
            t.printStackTrace();
        });
        return taskScheduler;
    }
}
任务动态更新

java

体验AI代码助手

代码解读

复制代码

@Component
@Slf4j
public final class ScheduleTaskService implements InitializingBean {
    
    //任务注册表,存储所有运行的任务,动态增删基于它来操作
    public final static ConcurrentHashMap<String, dynamicScheduleTask> taskHolder = new ConcurrentHashMap<>();

    @Resource
    private ScheduleInfoDao scheduleInfoDao;

    //拿到所有的ITask对象,转换成Map<TaskCode, ITask>,添加任务时通过TaskCode定位到ITask
    @Resource
    private List<ITask> jobs;
    private Map<String, ITask> jobMap;

    @Resource
    private ThreadPoolTaskScheduler taskScheduler;

    public void newScheduleTask(dynamicScheduleTask scheduleTask) {
        String taskCode = scheduleTask.getTaskCode();
        String taskCron = scheduleTask.getTaskCron();

        // 通过taskCode定位Itask,然后添加调度器
        ITask job = jobMap.get(taskCode);
        TaskPO taskPO = new TaskPO();
        BeanUtils.copyProperties(scheduleTask, taskPO);
        job.setTaskInfo(taskPO);

       // Date executionTime = new CronTrigger(taskCron).nextExecutionTime(new SimpleTriggerContext());
       // log.info("executionTime={}",executionTime);

        ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(job, new CronTrigger(taskCron));
        taskHolder.put(taskCode, new dynamicScheduleTask(taskCode, taskCron, scheduledFuture));
        log.info("new task={} cron={}", taskCode, taskCron);
    }

    public void updateScheduleTask(dynamicScheduleTask newScheduleTask) {
        String taskCode = newScheduleTask.getTaskCode();
        String taskCron = newScheduleTask.getTaskCron();
        Integer state = newScheduleTask.getState();

        if (state == 0) {
            removeScheduleTask(taskCode);
            return;
        }

        dynamicScheduleTask oldScheduleTask = taskHolder.get(taskCode);

        if (oldScheduleTask == null) {
            newScheduleTask(newScheduleTask);
            return;
        }

        if (oldScheduleTask != null && !taskCron.equalsIgnoreCase(oldScheduleTask.getTaskCron())) {
            log.info("update task={},cron={} --> {}", taskCode, oldScheduleTask.getTaskCron(), taskCron);
            removeScheduleTask(taskCode);

            newScheduleTask(new dynamicScheduleTask(taskCode, taskCron));
        }
    }

    public void removeScheduleTask(String taskCode) {
        dynamicScheduleTask oldScheduleTask = taskHolder.get(taskCode);
        if (oldScheduleTask != null) {
            oldScheduleTask.getScheduledFuture().cancel(true);
            taskHolder.remove(taskCode);
            log.info("remove task={}", taskCode);
        }
    }

    public void scheduleAll() {
        List<ScheduleInfoEntity> schedules = (List<ScheduleInfoEntity>) scheduleInfoDao.findAll();
        schedules.stream()
                .filter(task -> task.getState() == 1) //过滤task状态,1为启动
                .forEach(task -> {
                    dynamicScheduleTask dynamicScheduleTask = new dynamicScheduleTask();
                    BeanUtils.copyProperties(task, dynamicScheduleTask);
                    newScheduleTask(dynamicScheduleTask);
                });
    }


    @Override
    public void afterPropertiesSet() {
        jobMap = jobs.stream().collect(Collectors.toMap(ITask::getTaskCode, Function.identity()));
    }
}
启动任务扫描与更新

ScheduleInit ApplicationRunner的实现类,用于服务启动后加载数据库中的任务,以及启动线程定时扫描任务表,更新调度器。

ScheduleUpdater 定时扫描数据库做更新

java

体验AI代码助手

代码解读

复制代码

@Component
public class ScheduleInit implements ApplicationRunner {

    @Resource
    private ScheduleUpdater scheduleUpdater;

    @Resource
    private ScheduleTaskService scheduleTaskService;

    @Resource
    private ThreadPoolTaskScheduler taskScheduler;

    @Value("${task.refresh.interval:5}")
    private int refreshInterval;

    @Override
    public void run(ApplicationArguments args) {
        scheduleTaskService.scheduleAll();

        //task update thread
        taskScheduler.scheduleWithFixedDelay(scheduleUpdater, Duration.ofSeconds(refreshInterval));
    }
}

@Slf4j
@Component
public class ScheduleUpdater implements Runnable {

    @Resource
    private ScheduleInfoDao scheduleInfoDao;

    @Resource
    private ScheduleTaskService scheduleTaskService;

    @Override
    public void run() {
        scheduleInfoDao.findAll().forEach(task -> {
            dynamicScheduleTask dynamicScheduleTask = new dynamicScheduleTask();
            BeanUtils.copyProperties(task, dynamicScheduleTask);
            scheduleTaskService.updateScheduleTask(dynamicScheduleTask);
        });
    }

}
任务基类

ITask 动态任务的基类,项目中的定时任务都基于它实现业务逻辑,如FooTask

java

体验AI代码助手

代码解读

复制代码

public interface ITask extends Runnable {

    /**
     * ScheduleTaskService通过taskCode定位到具体的任务
     */
    String getTaskCode();

    /**
     * 传递数据库中的task信息
     *
     * @param taskPO
     */
    default void setTaskInfo(TaskPO taskPO) {

    }
}

@Component
@Slf4j
public class FooTask implements ITask {

    @Override
    public String getTaskCode() {
        return "foo";
    }

    @Override
    public void run() {
        log.info("I am {} task,cron={}, 呱呱呱", getTaskCode(), taskPO.getTaskCron());
    }


    @Override
    public void setTaskInfo(TaskPO taskPO) {
        this.taskPO = taskPO;
    }

    private TaskPO taskPO;
}

@Component
@Slf4j
public class DiDiTask implements ITask {


    @Override
    public String getTaskCode() {
        return "didi";
    }

    @Override
    public void run() {
        log.info("I am {} task, 滴滴滴", getTaskCode());
    }

//    @Override
//    public void setTaskInfo(TaskPO taskPO) {
//        this.taskPO = taskPO;
//    }
//
//    private TaskPO taskPO;
}
其它的数据库访问层及domain

java

体验AI代码助手

代码解读

复制代码

//CrudRepository
public interface ScheduleInfoDao extends CrudRepository<ScheduleInfoEntity,Integer> {

}

//Entity
@Entity
@Table(name = "t_scheduler_info")
@Data
public class ScheduleInfoEntity {
    @Id
    private Integer id;
    private String taskName;
    private String taskCode;
    private String taskCron;
    private Integer state;
}

//传递给ITask,供业务端灵活使用
@Data
public class TaskPO {
    private Integer id;
    private String taskName;
    private String taskCode;
    private String taskCron;
    private Integer state;
    // other info...
}

//ScheduleTaskService使用的对象,主要是hold ScheduledFuture
@Data
public class dynamicScheduleTask {
    private String taskCode;
    private String taskCron;
    private String taskName;
    private Integer state;

    /**
     * 任务添加到调度器之后会返回该对象,可以用于控制任务的取消
     */
    private ScheduledFuture<?> scheduledFuture;

    public dynamicScheduleTask() {

    }

    public dynamicScheduleTask(String taskCode, String taskCron) {
        this.taskCode = taskCode;
        this.taskCron = taskCron;
    }

    public dynamicScheduleTask(String taskCode, String taskCron, ScheduledFuture<?> scheduledFuture) {
        this.taskCode = taskCode;
        this.taskCron = taskCron;
        this.scheduledFuture = scheduledFuture;
    }

    public String getTaskCode() {
        return taskCode;
    }

}

properties

体验AI代码助手

代码解读

复制代码

#数据库配置
# database
spring.datasource.url=jdbc:mysql://localhost:3306/demo?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.druid.initialSize=10
spring.datasource.druid.minIdle=20
spring.datasource.druid.maxActive=20
spring.datasource.druid.query-timeout=60
spring.datasource.druid.max-wait=20000

转载来源:https://juejin.cn/post/7214254999907352634


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
监控 Java 关系型数据库
Spring Boot整合MySQL主从集群同步延迟解决方案
本文针对电商系统在Spring Boot+MyBatis架构下的典型问题(如大促时订单状态延迟、库存超卖误判及用户信息更新延迟)提出解决方案。核心内容包括动态数据源路由(强制读主库)、大事务拆分优化以及延迟感知补偿机制,配合MySQL参数调优和监控集成,有效将主从延迟控制在1秒内。实际测试表明,在10万QPS场景下,订单查询延迟显著降低,超卖误判率下降98%。
|
2月前
|
关系型数据库 MySQL 调度
如何在MySQL中创建定时任务?
MySQL 事件调度器(Event Scheduler)可实现定时任务自动化。例如,每天凌晨清空 `test` 表,并在一个月后自动停止任务。需先启用调度器(`SET GLOBAL event_scheduler = ON`),再创建事件(使用 `CREATE EVENT` 定义执行频率和操作)。推荐用 `TRUNCATE` 提高效率,注意权限与时区设置。为防数据丢失,可结合备份机制。到期后事件自动禁用,建议定期清理。
106 4
|
3月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
765 43
|
5月前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
297 1
|
5月前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
129 5
|
6月前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
206 9
|
6月前
|
分布式计算 关系型数据库 MySQL
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型 图像处理 光通信 分布式计算 算法语言 信息技术 计算机应用
121 8
|
6月前
|
SQL 关系型数据库 MySQL
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
101 1
|
7月前
|
Java 关系型数据库 MySQL
springboot学习五:springboot整合Mybatis 连接 mysql数据库
这篇文章是关于如何使用Spring Boot整合MyBatis来连接MySQL数据库,并进行基本的增删改查操作的教程。
1360 0
springboot学习五:springboot整合Mybatis 连接 mysql数据库
|
6月前
|
关系型数据库 MySQL Java
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
127 0