介绍
一个极简的基于springboot的动态定时任务demo,spring-context模块有对任务调度进行支持,本demo也是基于这个模块进行开发,功能相对简单,用户可以按需改造,比如添加运维界面,在界面上更加灵活的控制任务执行与更新。
基本原理
- 服务启动后,扫描数据库中的task表,将定时任务添加到调度器中
- 起一个固定频率执行的ScheduleUpdater任务去扫描数据库,判断任务如有启停、调整cron的话,就更新调度器中的任务。
依赖的核心API
将任务添加到调度器之后,会返回一个ScheduledFuture对象用于控制任务的取消执行。
java
体验AI代码助手
代码解读
复制代码
//添加任务到调度器
@see org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler#schedule(java.lang.Runnable, org.springframework.scheduling.Trigger)
//取消任务执行
@see java.util.concurrent.Future#cancel
@see java.util.concurrent.ScheduledFuture
具体实现
数据库表
sql
体验AI代码助手
代码解读
复制代码
-- ----------------------------
-- Table structure for t_scheduler_info
-- ----------------------------
DROP TABLE IF EXISTS `t_scheduler_info`;
CREATE TABLE `t_scheduler_info` (
`id` bigint NOT NULL AUTO_INCREMENT,
`task_name` varchar(32) NOT NULL,
`task_code` varchar(64) NOT NULL,
`task_cron` varchar(32) NOT NULL,
`state` tinyint DEFAULT 1 NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='任务表';
INSERT INTO `demo`.`t_scheduler_info` (`id`, `task_name`, `task_code`, `task_cron`, `state`) VALUES ('1', 'foo', 'foo', '0/5 * * * * ?', '1');
INSERT INTO `demo`.`t_scheduler_info` (`id`, `task_name`, `task_code`, `task_cron`, `state`) VALUES ('2', 'bar', 'bar', '0/9 * * * * ?', '0');
INSERT INTO `demo`.`t_scheduler_info` (`id`, `task_name`, `task_code`, `task_cron`, `state`) VALUES ('3', 'didi', 'didi', '0/10 * * * * ?', '0');
maven依赖
xml
体验AI代码助手
代码解读
复制代码
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.10.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>schedule-demo</artifactId>
<version>1.0.0</version>
<name>schedule-demo</name>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
项目启动类及配置类
java
体验AI代码助手
代码解读
复制代码
@SpringBootApplication
@EnableScheduling //开启定时任务支持
public class ScheduleDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleDemoApplication.class, args);
}
}
@Configuration
@Slf4j
public class ScheduleConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
}
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
taskScheduler.setThreadNamePrefix("dynamic_schedule_");
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setErrorHandler(t -> {
//TODO
t.printStackTrace();
});
return taskScheduler;
}
}
任务动态更新
java
体验AI代码助手
代码解读
复制代码
@Component
@Slf4j
public final class ScheduleTaskService implements InitializingBean {
//任务注册表,存储所有运行的任务,动态增删基于它来操作
public final static ConcurrentHashMap<String, dynamicScheduleTask> taskHolder = new ConcurrentHashMap<>();
@Resource
private ScheduleInfoDao scheduleInfoDao;
//拿到所有的ITask对象,转换成Map<TaskCode, ITask>,添加任务时通过TaskCode定位到ITask
@Resource
private List<ITask> jobs;
private Map<String, ITask> jobMap;
@Resource
private ThreadPoolTaskScheduler taskScheduler;
public void newScheduleTask(dynamicScheduleTask scheduleTask) {
String taskCode = scheduleTask.getTaskCode();
String taskCron = scheduleTask.getTaskCron();
// 通过taskCode定位Itask,然后添加调度器
ITask job = jobMap.get(taskCode);
TaskPO taskPO = new TaskPO();
BeanUtils.copyProperties(scheduleTask, taskPO);
job.setTaskInfo(taskPO);
// Date executionTime = new CronTrigger(taskCron).nextExecutionTime(new SimpleTriggerContext());
// log.info("executionTime={}",executionTime);
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(job, new CronTrigger(taskCron));
taskHolder.put(taskCode, new dynamicScheduleTask(taskCode, taskCron, scheduledFuture));
log.info("new task={} cron={}", taskCode, taskCron);
}
public void updateScheduleTask(dynamicScheduleTask newScheduleTask) {
String taskCode = newScheduleTask.getTaskCode();
String taskCron = newScheduleTask.getTaskCron();
Integer state = newScheduleTask.getState();
if (state == 0) {
removeScheduleTask(taskCode);
return;
}
dynamicScheduleTask oldScheduleTask = taskHolder.get(taskCode);
if (oldScheduleTask == null) {
newScheduleTask(newScheduleTask);
return;
}
if (oldScheduleTask != null && !taskCron.equalsIgnoreCase(oldScheduleTask.getTaskCron())) {
log.info("update task={},cron={} --> {}", taskCode, oldScheduleTask.getTaskCron(), taskCron);
removeScheduleTask(taskCode);
newScheduleTask(new dynamicScheduleTask(taskCode, taskCron));
}
}
public void removeScheduleTask(String taskCode) {
dynamicScheduleTask oldScheduleTask = taskHolder.get(taskCode);
if (oldScheduleTask != null) {
oldScheduleTask.getScheduledFuture().cancel(true);
taskHolder.remove(taskCode);
log.info("remove task={}", taskCode);
}
}
public void scheduleAll() {
List<ScheduleInfoEntity> schedules = (List<ScheduleInfoEntity>) scheduleInfoDao.findAll();
schedules.stream()
.filter(task -> task.getState() == 1) //过滤task状态,1为启动
.forEach(task -> {
dynamicScheduleTask dynamicScheduleTask = new dynamicScheduleTask();
BeanUtils.copyProperties(task, dynamicScheduleTask);
newScheduleTask(dynamicScheduleTask);
});
}
@Override
public void afterPropertiesSet() {
jobMap = jobs.stream().collect(Collectors.toMap(ITask::getTaskCode, Function.identity()));
}
}
启动任务扫描与更新
ScheduleInit
ApplicationRunner的实现类,用于服务启动后加载数据库中的任务,以及启动线程定时扫描任务表,更新调度器。
ScheduleUpdater
定时扫描数据库做更新
java
体验AI代码助手
代码解读
复制代码
@Component
public class ScheduleInit implements ApplicationRunner {
@Resource
private ScheduleUpdater scheduleUpdater;
@Resource
private ScheduleTaskService scheduleTaskService;
@Resource
private ThreadPoolTaskScheduler taskScheduler;
@Value("${task.refresh.interval:5}")
private int refreshInterval;
@Override
public void run(ApplicationArguments args) {
scheduleTaskService.scheduleAll();
//task update thread
taskScheduler.scheduleWithFixedDelay(scheduleUpdater, Duration.ofSeconds(refreshInterval));
}
}
@Slf4j
@Component
public class ScheduleUpdater implements Runnable {
@Resource
private ScheduleInfoDao scheduleInfoDao;
@Resource
private ScheduleTaskService scheduleTaskService;
@Override
public void run() {
scheduleInfoDao.findAll().forEach(task -> {
dynamicScheduleTask dynamicScheduleTask = new dynamicScheduleTask();
BeanUtils.copyProperties(task, dynamicScheduleTask);
scheduleTaskService.updateScheduleTask(dynamicScheduleTask);
});
}
}
任务基类
ITask
动态任务的基类,项目中的定时任务都基于它实现业务逻辑,如FooTask
java
体验AI代码助手
代码解读
复制代码
public interface ITask extends Runnable {
/**
* ScheduleTaskService通过taskCode定位到具体的任务
*/
String getTaskCode();
/**
* 传递数据库中的task信息
*
* @param taskPO
*/
default void setTaskInfo(TaskPO taskPO) {
}
}
@Component
@Slf4j
public class FooTask implements ITask {
@Override
public String getTaskCode() {
return "foo";
}
@Override
public void run() {
log.info("I am {} task,cron={}, 呱呱呱", getTaskCode(), taskPO.getTaskCron());
}
@Override
public void setTaskInfo(TaskPO taskPO) {
this.taskPO = taskPO;
}
private TaskPO taskPO;
}
@Component
@Slf4j
public class DiDiTask implements ITask {
@Override
public String getTaskCode() {
return "didi";
}
@Override
public void run() {
log.info("I am {} task, 滴滴滴", getTaskCode());
}
// @Override
// public void setTaskInfo(TaskPO taskPO) {
// this.taskPO = taskPO;
// }
//
// private TaskPO taskPO;
}
其它的数据库访问层及domain
java
体验AI代码助手
代码解读
复制代码
//CrudRepository
public interface ScheduleInfoDao extends CrudRepository<ScheduleInfoEntity,Integer> {
}
//Entity
@Entity
@Table(name = "t_scheduler_info")
@Data
public class ScheduleInfoEntity {
@Id
private Integer id;
private String taskName;
private String taskCode;
private String taskCron;
private Integer state;
}
//传递给ITask,供业务端灵活使用
@Data
public class TaskPO {
private Integer id;
private String taskName;
private String taskCode;
private String taskCron;
private Integer state;
// other info...
}
//ScheduleTaskService使用的对象,主要是hold ScheduledFuture
@Data
public class dynamicScheduleTask {
private String taskCode;
private String taskCron;
private String taskName;
private Integer state;
/**
* 任务添加到调度器之后会返回该对象,可以用于控制任务的取消
*/
private ScheduledFuture<?> scheduledFuture;
public dynamicScheduleTask() {
}
public dynamicScheduleTask(String taskCode, String taskCron) {
this.taskCode = taskCode;
this.taskCron = taskCron;
}
public dynamicScheduleTask(String taskCode, String taskCron, ScheduledFuture<?> scheduledFuture) {
this.taskCode = taskCode;
this.taskCron = taskCron;
this.scheduledFuture = scheduledFuture;
}
public String getTaskCode() {
return taskCode;
}
}
properties
体验AI代码助手
代码解读
复制代码
#数据库配置
# database
spring.datasource.url=jdbc:mysql://localhost:3306/demo?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.druid.initialSize=10
spring.datasource.druid.minIdle=20
spring.datasource.druid.maxActive=20
spring.datasource.druid.query-timeout=60
spring.datasource.druid.max-wait=20000
转载来源:https://juejin.cn/post/7214254999907352634