环境准备
依赖、application 配置文件
依赖
<!-- Quartz 任务调度 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
application 配置文件:
# 开发环境配置
server:
# 服务器的HTTP端口
port: 80
servlet:
# 应用的访问路径
context-path: /
tomcat:
# tomcat的URI编码
uri-encoding: UTF-8
spring:
datasource:
username: root
password: root
url: jdbc:mysql://127.0.0.1:3306/quartz?useUnicode=true&characterEncoding=utf-8&useSSL=true
driver-class-name: com.mysql.cj.jdbc.Driver
Quartz数据库脚本
Quartz 自带有数据库模式
- 脚本现成脚本:https://gitee.com/qianwei4712/code-of-shiva/blob/master/quartz/quartz.sql
- 本文统一使用 Cron 方式来创建
注意:cron 方式需要用到的4张数据表:qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details
额外新增保存任务的数据库表:
DROP TABLE IF EXISTS quartz_job; CREATE TABLE quartz_job ( job_id bigint(20) NOT NULL AUTO_INCREMENT COMMENT '任务ID', job_name varchar(64) NOT NULL DEFAULT '' COMMENT '任务名称', job_group varchar(64) NOT NULL DEFAULT 'DEFAULT' COMMENT '任务组名', invoke_target varchar(500) NOT NULL COMMENT '调用目标字符串', cron_expression varchar(255) DEFAULT '' COMMENT 'cron执行表达式', misfire_policy varchar(20) DEFAULT '3' COMMENT '计划执行错误策略(1立即执行 2执行一次 3放弃执行)', concurrent char(1) DEFAULT '1' COMMENT '是否并发执行(0允许 1禁止)', status char(1) DEFAULT '0' COMMENT '状态(0正常 1暂停)', remark varchar(500) DEFAULT '' COMMENT '备注信息', PRIMARY KEY (job_id), UNIQUE INDEX quartz_job_unique(job_id, job_name, job_group) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='定时任务调度表';
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.DynamicInsert; import org.hibernate.annotations.DynamicUpdate; import javax.persistence.*; import java.io.Serializable; @Data @Builder @NoArgsConstructor @AllArgsConstructor @Entity @DynamicInsert @DynamicUpdate @Table(name="quartz_job") public class QuartzJob implements Serializable { public static final long serialVersionUID = 42L; /** * 任务ID */ @Id @GeneratedValue(strategy = GenerationType.AUTO) @Column(name = "job_id", nullable = false) private Long jobId; /** * 任务名称 */ @Column(name = "job_name", length = 64, nullable = false) private String jobName; /** * 任务组名 */ @Column(name = "job_group", length = 64, nullable = false) private String jobGroup; /** * 调用目标字符串 */ @Column(name = "invoke_target", length = 500, nullable = false) private String invokeTarget; /** * cron执行表达式 */ @Column(name = "cron_expression", length = 255) private String cronExpression; /** * 计划执行错误策略(1立即执行 2执行一次 3放弃执行) */ @Column(name = "misfire_policy", length = 20) private String misfirePolicy; /** * 是否并发执行(0允许 1禁止) */ @Column(name = "concurrent") private String concurrent; /** * 状态(0正常 1暂停) */ @Column(name = "status") private String status; /** * 备注信息 */ @Column(name = "remark", length = 500) private String remark; }
SprinUtils工具类
作用:获取 ApplicationContext 中的 bean
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* SpringUtils工具类:获取bean
*/
@Component
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext arg0) throws BeansException {
if (SpringUtils.applicationContext == null) {
SpringUtils.applicationContext = arg0;
}
}
// 获取applicationContext
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
// 通过name获取 Bean.
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
// 通过class获取Bean.
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
// 通过name,以及Clazz返回指定的Bean
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
ScheduleConstants 静态变量类
静态变量类
import lombok.AllArgsConstructor;
import lombok.Getter;
public class ScheduleConstants {
// 计划执行错误策略-默认策略
public static final String MISFIRE_DEFAULT = "0";
// 计划执行错误策略-立即执行(立即执行执行所有misfire的任务)
public static final String MISFIRE_IGNORE_MISFIRES = "1";
// 计划执行错误策略-执行一次(立即执行一次任务)
public static final String MISFIRE_FIRE_AND_PROCEED = "2";
// 计划执行错误策略-放弃执行(什么都不做,等待下次触发)
public static final String MISFIRE_DO_NOTHING = "3";
// 任务实例名称
public static final String TASK_CLASS_NAME = "TASK_CLASS_NAME";
// 任务内容
public static final String TASK_PROPERTIES = "TASK_PROPERTIES";
@AllArgsConstructor
@Getter
public enum Status {
NORMAL("0"),
PAUSE("1");
private String value;
}
}
任务方法
准备一个任务方法(后面通过反射调用,故此处不用实现 Job 接口):
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Slf4j
@Component("mysqlJob")
public class MysqlJob {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
public void execute(String param) {
logger.info("执行 Mysql Job,当前时间:{},任务参数:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), param);
}
}
ScheduleConfig 配置代码类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.Properties;
@Configuration
public class ScheduleConfig {
/**
* SchedulerFactoryBean 继承了 InitializingBean 接口会在类被注入Spring容器后执行 afterPropertiesSet 方法
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
// quartz参数
Properties prop = new Properties();
prop.put("org.quartz.scheduler.instanceName", "shivaScheduler");
prop.put("org.quartz.scheduler.instanceId", "AUTO");
// 线程池配置
prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
prop.put("org.quartz.threadPool.threadCount", "20");
prop.put("org.quartz.threadPool.threadPriority", "5");
// JobStore配置
prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
// 集群配置
prop.put("org.quartz.jobStore.isClustered", "true");
prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");
// sqlserver 启用
// prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
prop.put("org.quartz.jobStore.misfireThreshold", "12000");
prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
factory.setQuartzProperties(prop);
factory.setSchedulerName("shivaScheduler");
// 延时启动
factory.setStartupDelay(1);
factory.setApplicationContextSchedulerContextKey("applicationContextKey");
// 可选,QuartzScheduler
// 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
// 设置自动启动,默认为true
factory.setAutoStartup(true);
return factory;
}
}
ScheduleUtils 调度工具类
最核心的代码
import org.quartz.*;
public class ScheduleUtils {
/**
* 得到quartz任务类
*
* @param job 执行计划
* @return 具体执行任务类
*/
private static Class<? extends Job> getQuartzJobClass(QuartzJob job) {
boolean isConcurrent = "0".equals(job.getConcurrent());
return isConcurrent ? QuartzJobExecution.class : QuartzDisallowConcurrentExecution.class;
}
/**
* 构建任务触发对象
*/
public static TriggerKey getTriggerKey(Long jobId, String jobGroup) {
return TriggerKey.triggerKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
}
/**
* 构建任务键对象
*/
public static JobKey getJobKey(Long jobId, String jobGroup) {
return JobKey.jobKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
}
/**
* 创建定时任务
*/
public static void createScheduleJob(Scheduler scheduler, QuartzJob job) throws Exception {
// 得到quartz任务类
Class<? extends Job> jobClass = getQuartzJobClass(job);
// 构建job信息
Long jobId = job.getJobId();
String jobGroup = job.getJobGroup();
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();
// 表达式调度构建器
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(getTriggerKey(jobId, jobGroup))
.withSchedule(cronScheduleBuilder).build();
// 放入参数,运行时的方法可以获取
jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);
// 判断是否存在
if (scheduler.checkExists(getJobKey(jobId, jobGroup))) {
// 防止创建时存在数据问题 先移除,然后在执行创建操作
scheduler.deleteJob(getJobKey(jobId, jobGroup));
}
scheduler.scheduleJob(jobDetail, trigger);
// 暂停任务。完成任务与触发器的关联后,如果是暂停状态,会先让调度器停止任务。
if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) {
scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
}
}
/**
* 设置定时任务策略
*/
public static CronScheduleBuilder handleCronScheduleMisfirePolicy(QuartzJob job, CronScheduleBuilder cb) throws Exception {
switch (job.getMisfirePolicy()) {
case ScheduleConstants.MISFIRE_DEFAULT:
return cb;
case ScheduleConstants.MISFIRE_IGNORE_MISFIRES:
return cb.withMisfireHandlingInstructionIgnoreMisfires();
case ScheduleConstants.MISFIRE_FIRE_AND_PROCEED:
return cb.withMisfireHandlingInstructionFireAndProceed();
case ScheduleConstants.MISFIRE_DO_NOTHING:
return cb.withMisfireHandlingInstructionDoNothing();
default:
throw new Exception("The task misfire policy '" + job.getMisfirePolicy()
+ "' cannot be used in cron schedule tasks");
}
}
}
AbstractQuartzJob 抽象任务
这个类将原本 execute
方法执行的任务,下放到了子类重载的 doExecute
方法中
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.Date;
@Slf4j
public abstract class AbstractQuartzJob implements Job {
/**
* 线程本地变量
*/
private static ThreadLocal<Date> threadLocal = new ThreadLocal<>();
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
QuartzJob job = new QuartzJob();
BeanUtils.copyProperties(context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES), job);
try {
before(context, job);
doExecute(context, job);
after(context, job, null);
} catch (Exception e) {
log.error("任务执行异常 - :", e);
after(context, job, e);
}
}
/**
* 执行前
*
* @param context 工作执行上下文对象
* @param job 系统计划任务
*/
protected void before(JobExecutionContext context, QuartzJob job) {
threadLocal.set(new Date());
}
/**
* 执行后
*
* @param context 工作执行上下文对象
* @param sysJob 系统计划任务
*/
protected void after(JobExecutionContext context, QuartzJob sysJob, Exception e) {
}
/**
* 执行方法,由子类重载
*
* @param context 工作执行上下文对象
* @param job 系统计划任务
* @throws Exception 执行过程中的异常
*/
protected abstract void doExecute(JobExecutionContext context, QuartzJob job) throws Exception;
}
AbstractQuartzJob 实现类
两个实现类,分了允许并发和不允许并发,差别就是一个是否允许并发任务的注解:
import org.quartz.JobExecutionContext;
public class QuartzJobExecution extends AbstractQuartzJob {
@Override
protected void doExecute(JobExecutionContext context, QuartzJob job) throws Exception {
JobInvokeUtil.invokeMethod(job);
}
}
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
@DisallowConcurrentExecution
public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob {
@Override
protected void doExecute(JobExecutionContext context, QuartzJob job) throws Exception {
JobInvokeUtil.invokeMethod(job);
}
}
JobInvokeUtil 反射调用 job 工具类
JobInvokeUtil 通过反射,进行实际的方法调用
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
public class JobInvokeUtil {
/**
* 执行方法
*
* @param job 系统任务
*/
public static void invokeMethod(QuartzJob job) throws Exception {
String invokeTarget = job.getInvokeTarget();
String beanName = getBeanName(invokeTarget);
String methodName = getMethodName(invokeTarget);
List<Object[]> methodParams = getMethodParams(invokeTarget);
if (!isValidClassName(beanName)) {
Object bean = SpringUtils.getBean(beanName);
invokeMethod(bean, methodName, methodParams);
} else {
Object bean = Class.forName(beanName).newInstance();
invokeMethod(bean, methodName, methodParams);
}
}
/**
* 调用任务方法
*
* @param bean 目标对象
* @param methodName 方法名称
* @param methodParams 方法参数
*/
private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams)
throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException {
if (!CollectionUtils.isEmpty(methodParams)) {
Method method = bean.getClass().getDeclaredMethod(methodName, getMethodParamsType(methodParams));
method.invoke(bean, getMethodParamsValue(methodParams));
} else {
Method method = bean.getClass().getDeclaredMethod(methodName);
method.invoke(bean);
}
}
/**
* 校验是否为为class包名
*
* @param invokeTarget 名称
* @return true是 false否
*/
public static boolean isValidClassName(String invokeTarget) {
return StringUtils.countMatches(invokeTarget, ".") > 1;
}
/**
* 获取bean名称
*
* @param invokeTarget 目标字符串
* @return bean名称
*/
public static String getBeanName(String invokeTarget) {
String beanName = StringUtils.substringBefore(invokeTarget, "(");
return StringUtils.substringBeforeLast(beanName, ".");
}
/**
* 获取bean方法
*
* @param invokeTarget 目标字符串
* @return method方法
*/
public static String getMethodName(String invokeTarget) {
String methodName = StringUtils.substringBefore(invokeTarget, "(");
return StringUtils.substringAfterLast(methodName, ".");
}
/**
* 获取method方法参数相关列表
*
* @param invokeTarget 目标字符串
* @return method方法相关参数列表
*/
public static List<Object[]> getMethodParams(String invokeTarget) {
String methodStr = StringUtils.substringBetween(invokeTarget, "(", ")");
if (StringUtils.isEmpty(methodStr)) {
return null;
}
String[] methodParams = methodStr.split(",");
List<Object[]> classs = new LinkedList<>();
for (String methodParam : methodParams) {
String str = StringUtils.trimToEmpty(methodParam);
// String字符串类型,包含'
if (StringUtils.contains(str, "'")) {
classs.add(new Object[]{StringUtils.replace(str, "'", ""), String.class});
}
// boolean布尔类型,等于true或者false
else if (StringUtils.equals(str, "true") || StringUtils.equalsIgnoreCase(str, "false")) {
classs.add(new Object[]{Boolean.valueOf(str), Boolean.class});
}
// long长整形,包含L
else if (StringUtils.containsIgnoreCase(str, "L")) {
classs.add(new Object[]{Long.valueOf(StringUtils.replaceChars(str, "L", "")), Long.class});
}
// double浮点类型,包含D
else if (StringUtils.containsIgnoreCase(str, "D")) {
classs.add(new Object[]{Double.valueOf(StringUtils.replaceChars(str, "D", "")), Double.class});
}
// 其他类型归类为整形
else {
classs.add(new Object[]{Integer.valueOf(str), Integer.class});
}
}
return classs;
}
/**
* 获取参数类型
*
* @param methodParams 参数相关列表
* @return 参数类型列表
*/
public static Class<?>[] getMethodParamsType(List<Object[]> methodParams) {
Class<?>[] classs = new Class<?>[methodParams.size()];
int index = 0;
for (Object[] os : methodParams) {
classs[index] = (Class<?>) os[1];
index++;
}
return classs;
}
/**
* 获取参数值
*
* @param methodParams 参数相关列表
* @return 参数值列表
*/
public static Object[] getMethodParamsValue(List<Object[]> methodParams) {
Object[] classs = new Object[methodParams.size()];
int index = 0;
for (Object[] os : methodParams) {
classs[index] = (Object) os[0];
index++;
}
return classs;
}
}
启动程序,查看调度器是否启动
2021-10-06 16:26:05.162 INFO 10764 --- [shivaScheduler]] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now, after delay of 1 seconds
2021-10-06 16:26:05.306 INFO 10764 --- [shivaScheduler]] org.quartz.core.QuartzScheduler : Scheduler shivaScheduler_$_DESKTOP-OKMJ1351633508761366 started.
QuartzSheduleTaskImpl
先将任务设置为暂停状态,数据库插入成功后,再在调度器新增任务,再手动根据 ID启动任务。
import com.duran.ssmtest.schedule.quartz.respositories.QuartzJob;
import com.duran.ssmtest.schedule.quartz.respositories.QuartzJobRespository;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Service
public class QuartzSheduleTaskImpl {
@Autowired
private QuartzJobRespository quartzJobRespository;
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
private Scheduler scheduler;
/**
* 项目启动时,初始化定时器
* 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据)
*/
@PostConstruct
public void init() throws Exception {
scheduler = schedulerFactoryBean.getScheduler();
scheduler.clear();
List<QuartzJob> jobList = quartzJobRespository.findAll();
for (QuartzJob job : jobList) {
ScheduleUtils.createScheduleJob(scheduler, job);
}
}
/**
* 新增定时任务
*/
@Transactional(rollbackFor = Exception.class)
public int insertJob(QuartzJob job){
// 先将任务设置为暂停状态
job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
try {
Long jobId = quartzJobRespository.save(job).getJobId();
ScheduleUtils.createScheduleJob(scheduler, job);
return jobId.shortValue();
} catch (Exception e) {
log.error("failed to insertJob", e);
// 手动回滚
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return 0;
}
}
/**
* 修改定时任务的状态:启动任务/暂停任务
*/
@Transactional(rollbackFor = Exception.class)
public int changeStatus(Long jobId, String status) throws SchedulerException {
QuartzJob job = quartzJobRespository.findById(jobId).orElse(null);
if (job == null) {
return 0;
}
job.setStatus(status);
try {
quartzJobRespository.save(job);
} catch (Exception e) {
log.error("failed to changeStatus", e);
// 手动回滚
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return 0;
}
//根据状态来启动或者关闭
if (ScheduleConstants.Status.NORMAL.getValue().equals(status)) {
scheduler.resumeJob(ScheduleUtils.getJobKey(jobId, job.getJobGroup()));
} else if (ScheduleConstants.Status.PAUSE.getValue().equals(status)) {
scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, job.getJobGroup()));
}
return 1;
}
/**
* 删除定时任务
*/
@Transactional(rollbackFor = Exception.class)
public int deleteJob(Long jobId){
QuartzJob job = quartzJobRespository.findById(jobId).orElse(null);
if (job == null) {
return 0;
}
try {
quartzJobRespository.deleteById(jobId);
scheduler.deleteJob(ScheduleUtils.getJobKey(jobId, job.getJobGroup()));
return 1;
} catch (Exception e) {
log.error("failed to insertJob", e);
// 手动回滚
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return 0;
}
}
}
新建任务请求
{
"concurrent": "1",
"cronExpression": "0/10 * * * * ?",
"invokeTarget": "mysqlJob.execute('got it!!!')",
"jobGroup": "mysqlGroup",
"jobName": "新增 mysqlJob 任务",
"misfirePolicy": "1",
"remark": "",
"status": "0"
}