一、Quartz 基本介绍
1.1 Quartz 概述
1.2 Quartz特点
1.3 Quartz 集群配置
二、Quartz 原理及流程
2.1 quartz基本原理
2.2 quartz启动流程
三、Spring + Quartz 实现企业级调度的实现示例
3.1 环境信息
3.2 相关代码及配置
四、问题及解决方案
五、相关知识
六、参考资料
总结
一、Quartz 基本介绍
1.1 Quartz 概述
Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目,完全基于 Java 实现。该项目于 2009 年被 Terracotta 收购,目前是 Terracotta 旗下的一个项目。读者可以到 站点下载 Quartz 的发布版本及其源代码。
1.2 Quartz特点
作为一个优秀的开源调度框架,Quartz 具有以下特点:
强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;
灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
分布式和集群能力,Terracotta 收购后在原来功能基础上作了进一步提升。
另外,作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。
quartz调度核心元素:
Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。
Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多,本文主要介绍这种方式。CronTrigger在spring中封装在CronTriggerFactoryBean中。
Calendar:它是一些日历特定时间点的集合。一个trigger可以包含多个Calendar,以便排除或包含某些时间点。
JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。
Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。
1.3 Quartz 集群配置
quartz集群是通过数据库表来感知其他的应用的,各个节点之间并没有直接的通信。只有使用持久的JobStore才能完成Quartz集群。
数据库表:以前有12张表,现在只有11张表,现在没有存储listener相关的表,多了QRTZ_SIMPROP_TRIGGERS表:
Table nameDescription
QRTZ_CALENDARS
存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS
存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS
存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS
存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE
存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS
存储程序的悲观锁的信息
QRTZ_JOB_DETAILS
存储每一个已配置的Job的详细信息
QRTZ_SIMPLE_TRIGGERS
存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS
Trigger作为Blob类型存储
QRTZ_TRIGGERS
存储已配置的Trigger的信息
QRTZ_SIMPROP_TRIGGERS
QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,包括以下几个锁:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。
二、Quartz 原理及流程
2.1 quartz基本原理
核心元素
Quartz 任务调度的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任务调度的元数据, scheduler 是实际执行调度的控制器。
在 Quartz 中,trigger 是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz 中主要提供了四种类型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。这四种 trigger 可以满足企业应用中的绝大部分需求。我们将在企业应用一节中进一步讨论四种 trigger 的功能。
在 Quartz 中,job 用于表示被调度的任务。主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。对于同一个 trigger 来说,有状态的 job 不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。Job 主要有两种属性:volatility 和 durability,其中 volatility 表示任务是否被持久化到数据库存储,而 durability 表示在没有 trigger 关联的时候任务是否被保留。两者都是在值为 true 的时候任务被持久化或保留。一个 job 可以被多个 trigger 关联,但是一个 trigger 只能关联一个 job。
在 Quartz 中, scheduler 由 scheduler 工厂创建:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二种工厂 StdSchedulerFactory 使用较多,因为 DirectSchedulerFactory 使用起来不够方便,需要作许多详细的手工编码设置。 Scheduler 主要有三种:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最常用的 StdScheduler 为例讲解。这也是笔者在项目中所使用的 scheduler 类。
Quartz 核心元素之间的关系如下图所示:
图 1. Quartz 核心元素关系图
线程视图
在 Quartz 中,有两类线程,Scheduler 调度线程和任务执行线程,其中任务执行线程通常使用一个线程池维护一组线程。
图 2. Quartz 线程视图
Scheduler 调度线程主要有两个: 执行常规调度的线程,和执行 misfired trigger 的线程。常规调度线程轮询存储的所有 trigger,如果有需要触发的 trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务。Misfire 线程是扫描所有的 trigger,查看是否有 misfired trigger,如果有的话根据 misfire 的策略分别处理。下图描述了这两个线程的基本流程:
图 3. Quartz 调度线程流程图
关于 misfired trigger,我们在企业应用一节中将进一步描述。
数据存储
Quartz 中的 trigger 和 job 需要存储下来才能被使用。Quartz 中有两种存储方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是将 trigger 和 job 存储在内存中,而 JobStoreSupport 是基于 jdbc 将 trigger 和 job 存储到数据库中。RAMJobStore 的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在通常应用中,都是使用 JobStoreSupport。
在 Quartz 中,JobStoreSupport 使用一个驱动代理来操作 trigger 和 job 的数据存储:StdJDBCDelegate。StdJDBCDelegate 实现了大部分基于标准 JDBC 的功能接口,但是对于各种数据库来说,需要根据其具体实现的特点做某些特殊处理,因此各种数据库需要扩展 StdJDBCDelegate 以实现这些特殊处理。Quartz 已经自带了一些数据库的扩展实现,可以直接使用,如下图所示:
图 4. Quartz 数据库驱动代理
作为嵌入式数据库的代表,Derby 近来非常流行。如果使用 Derby 数据库,可以使用上图中的 CloudscapeDelegate 作为 trigger 和 job 数据存储的代理类。
2.2 quartz启动流程
若quartz是配置在spring中,当服务器启动时,就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口,因此在初始化bean的时候,会执行afterPropertiesSet方法,该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)创建Scheduler。SchedulerFactory在创建quartzScheduler的过程中,将会读取配置参数,初始化各个组件,关键组件如下:
ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool创建了一定数量的WorkerThread实例来使得Job能够在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类,它实质上就是一个线程。在SimpleThreadPool中有三个list:workers-存放池中所有的线程引用,availWorkers-存放所有空闲的线程,busyWorkers-存放所有工作中的线程;
线程池的配置参数如下所示:
1
2
3
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount=3 org.quartz.threadPool.threadPriority=5
1
2
JobStore:分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务的管理,而JobStoreTX是自己管理事务),若要使用集群要使用JobStoreSupport的方式;
QuartzSchedulerThread:用来进行任务调度的线程,在初始化的时候paused=true,halted=false,虽然线程开始运行了,但是paused=true,线程会一直等待,直到start方法将paused置为false;
另外,SchedulerFactoryBean还实现了SmartLifeCycle接口,因此初始化完成后,会执行start()方法,该方法将主要会执行以下的几个动作:
创建ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理,将在下文详细讨论;
创建MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理,将在下文详细讨论;
置QuartzSchedulerThread的paused=false,调度线程才真正开始调度;
整个启动流程如下图:
三、Spring + Quartz 实现企业级调度的实现示例
3.1 环境信息
此示例中的环境: Spring 4.1.6.RELEASE + quartz 2.2.1 + Mysql 5.6
3.2 相关代码及配置
3.2.1 Maven 引入
3.2.2 数据库脚本准备
SET FOREIGN_KEY_CHECKS=0;
– —————————-
– Table structure for task_schedule_job
– —————————-
DROP TABLE IF EXISTS task_schedule_job
;
CREATE TABLE task_schedule_job
(
job_id
bigint(20) NOT NULL AUTO_INCREMENT,
create_time
timestamp NULL DEFAULT NULL,
update_time
timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
job_name
varchar(255) DEFAULT NULL,
job_group
varchar(255) DEFAULT NULL,
job_status
varchar(255) DEFAULT NULL,
cron_expression
varchar(255) NOT NULL,
description
varchar(255) DEFAULT NULL,
bean_class
varchar(255) DEFAULT NULL,
is_concurrent
varchar(255) DEFAULT NULL COMMENT ‘1’,
spring_id
varchar(255) DEFAULT NULL,
method_name
varchar(255) NOT NULL
PRIMARY KEY (job_id
),
UNIQUE KEY name_group
(job_name
,job_group
) USING BTREE
) ENGINE=InnoDB AUTOINCREMENT=1 DEFAULT CHARSET=utf8;
在Quartz包下docs/dbTables,选择对应的数据库脚本,创建相应的数据库表即可,我用的是mysql5.6,这里有一个需要注意的地方,mysql5.5之前用的表存储引擎是MyISAM,使用的是表级锁,锁发生冲突的概率比较高,并发度低;5.6之后默认的存储引擎为InnoDB,InnoDB采用的锁机制是行级锁,并发度也较高。而quartz集群使用数据库锁的
机制来来实现同一个任务在同一个时刻只被实例执行,所以为了防止冲突,我们建表的时候要选取InnoDB作为表的存
储引擎。如下:
3.2.3 关键代码及配置
spring-quartz.xml 配置 在application.xml 文件中引入
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy">
quartz.properties 文件配置
#==============================================================
#Configure Main Scheduler Properties
#==============================================================
org.quartz.scheduler.instanceName = KuanrfGSQuartzScheduler
org.quartz.scheduler.instanceId = AUTO
#==============================================================
#Configure JobStore
#==============================================================
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.maxMisfiresToHandleAtATime = 1
org.quartz.jobStore.misfireThreshold = 120000
org.quartz.jobStore.txIsolationLevelSerializable = false
#==============================================================
#Configure DataSource
#==============================================================
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = 你的数据链接
org.quartz.dataSource.myDS.user = 用户名
org.quartz.dataSource.myDS.password = 密码
org.quartz.dataSource.myDS.maxConnections = 30
org.quartz.jobStore.selectWithLockSQL = SELECT FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE
#==============================================================
#Configure ThreadPool
#==============================================================
org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 10
org.quartz.threadPool.threadPriority= 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true
#==============================================================
#Skip Check Update
#update:true
#not update:false
#==============================================================
org.quartz.scheduler.skipUpdateCheck = true
#============================================================================
# Configure Plugins
#============================================================================
org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownhook.cleanShutdown = true
关键代码
package com.netease.ad.omp.service.sys;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import com.netease.ad.omp.common.utils.SpringUtils;
import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import com.netease.ad.omp.quartz.job.JobUtils;
import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean;
import com.netease.ad.omp.quartz.job.QuartzJobFactory;
import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution;
import org.apache.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import //代码效果参考:http://hnjlyzjd.com/hw/wz_24709.html
org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
/**
计划任务管理
/
@Service
public class JobTaskService {
public final Logger log = Logger.getLogger(this.getClass());
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private ScheduleJobMapper scheduleJobMapper;
/**
从数据库中取 区别于getAllJob
@return
/
public List getAllTask() {
return scheduleJobMapper.select(null);
}
/**
添加到数据库中 区别于addJob
/
public void addTask(ScheduleJob job) {
job.setCreateTime(new Date());
scheduleJobMapper.insertSelective(job);
}
/**
从数据库中查询job
/
public ScheduleJob getTaskById(Long jobId) {
return scheduleJobMapper.selectByPrimaryKey(jobId);
}
/**
更改任务状态
@throws SchedulerException
/
public void changeStatus(Long jobId, String cmd) throws SchedulerException {
ScheduleJob job = getTaskById(jobId);
if (job == null) {
return;
}
if ("stop".equals(cmd)) {
deleteJob(job);
job.setJobStatus(JobUtils.STATUS_NOT_RUNNING);
} else if ("start".equals(cmd)) {
job.setJobStatus(JobUtils.STATUS_RUNNING);
addJob(job);
}
scheduleJobMapper.updateByPrimaryKeySelective(job);
}
/**
更改任务 cron表达式
@throws SchedulerException
/
public void updateCron(Long jobId, String cron) throws SchedulerException {
ScheduleJob job = getTaskById(jobId);
if (job == null) {
return;
}
job.setCronExpression(cron);
if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
updateJobCron(job);
}
scheduleJobMapper.updateByPrimaryKeySelective(job);
}
/**
添加任务
@throws SchedulerException
*/
public void addJob(ScheduleJob job) throws SchedulerException {
if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
return;
}
Scheduler scheduler = schedulerFactoryBean.getScheduler();
log.debug(scheduler + ".......................................................................................add");
TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 不存在,创建一个
if (null == trigger) {
Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();
jobDetail.getJobDataMap().put("scheduleJob", job);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
} else {
// Trigger已存在,那么更新相应的定时设置
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}
}
@PostConstruct
public void init() throws Exception {
<span style="c