技术笔记:quartz(从原理到应用)详解篇(转)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 技术笔记:quartz(从原理到应用)详解篇(转)

一、Quartz 基本介绍


1.1 Quartz 概述


1.2 Quartz特点


1.3 Quartz 集群配置


二、Quartz 原理及流程


2.1 quartz基本原理


2.2 quartz启动流程


三、Spring + Quartz 实现企业级调度的实现示例


3.1 环境信息


3.2 相关代码及配置


四、问题及解决方案


五、相关知识


六、参考资料


总结


一、Quartz 基本介绍


1.1 Quartz 概述


Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目,完全基于 Java 实现。该项目于 2009 年被 Terracotta 收购,目前是 Terracotta 旗下的一个项目。读者可以到 站点下载 Quartz 的发布版本及其源代码。


1.2 Quartz特点


作为一个优秀的开源调度框架,Quartz 具有以下特点:


强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;


灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;


分布式和集群能力,Terracotta 收购后在原来功能基础上作了进一步提升。


另外,作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。


quartz调度核心元素:


Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。


Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多,本文主要介绍这种方式。CronTrigger在spring中封装在CronTriggerFactoryBean中。


Calendar:它是一些日历特定时间点的集合。一个trigger可以包含多个Calendar,以便排除或包含某些时间点。


JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。


Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。


1.3 Quartz 集群配置


quartz集群是通过数据库表来感知其他的应用的,各个节点之间并没有直接的通信。只有使用持久的JobStore才能完成Quartz集群。


数据库表:以前有12张表,现在只有11张表,现在没有存储listener相关的表,多了QRTZ_SIMPROP_TRIGGERS表:


Table nameDescription


QRTZ_CALENDARS


存储Quartz的Calendar信息


QRTZ_CRON_TRIGGERS


存储CronTrigger,包括Cron表达式和时区信息


QRTZ_FIRED_TRIGGERS


存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息


QRTZ_PAUSED_TRIGGER_GRPS


存储已暂停的Trigger组的信息


QRTZ_SCHEDULER_STATE


存储少量的有关Scheduler的状态信息,和别的Scheduler实例


QRTZ_LOCKS


存储程序的悲观锁的信息


QRTZ_JOB_DETAILS


存储每一个已配置的Job的详细信息


QRTZ_SIMPLE_TRIGGERS


存储简单的Trigger,包括重复次数、间隔、以及已触的次数


QRTZ_BLOG_TRIGGERS


Trigger作为Blob类型存储


QRTZ_TRIGGERS


存储已配置的Trigger的信息


QRTZ_SIMPROP_TRIGGERS


QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,包括以下几个锁:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。


二、Quartz 原理及流程


2.1 quartz基本原理


核心元素


Quartz 任务调度的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任务调度的元数据, scheduler 是实际执行调度的控制器。


在 Quartz 中,trigger 是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz 中主要提供了四种类型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。这四种 trigger 可以满足企业应用中的绝大部分需求。我们将在企业应用一节中进一步讨论四种 trigger 的功能。


在 Quartz 中,job 用于表示被调度的任务。主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。对于同一个 trigger 来说,有状态的 job 不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。Job 主要有两种属性:volatility 和 durability,其中 volatility 表示任务是否被持久化到数据库存储,而 durability 表示在没有 trigger 关联的时候任务是否被保留。两者都是在值为 true 的时候任务被持久化或保留。一个 job 可以被多个 trigger 关联,但是一个 trigger 只能关联一个 job。


在 Quartz 中, scheduler 由 scheduler 工厂创建:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二种工厂 StdSchedulerFactory 使用较多,因为 DirectSchedulerFactory 使用起来不够方便,需要作许多详细的手工编码设置。 Scheduler 主要有三种:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最常用的 StdScheduler 为例讲解。这也是笔者在项目中所使用的 scheduler 类。


Quartz 核心元素之间的关系如下图所示:


图 1. Quartz 核心元素关系图


线程视图


在 Quartz 中,有两类线程,Scheduler 调度线程和任务执行线程,其中任务执行线程通常使用一个线程池维护一组线程。


图 2. Quartz 线程视图


Scheduler 调度线程主要有两个: 执行常规调度的线程,和执行 misfired trigger 的线程。常规调度线程轮询存储的所有 trigger,如果有需要触发的 trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务。Misfire 线程是扫描所有的 trigger,查看是否有 misfired trigger,如果有的话根据 misfire 的策略分别处理。下图描述了这两个线程的基本流程:


图 3. Quartz 调度线程流程图


关于 misfired trigger,我们在企业应用一节中将进一步描述。


数据存储


Quartz 中的 trigger 和 job 需要存储下来才能被使用。Quartz 中有两种存储方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是将 trigger 和 job 存储在内存中,而 JobStoreSupport 是基于 jdbc 将 trigger 和 job 存储到数据库中。RAMJobStore 的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在通常应用中,都是使用 JobStoreSupport。


在 Quartz 中,JobStoreSupport 使用一个驱动代理来操作 trigger 和 job 的数据存储:StdJDBCDelegate。StdJDBCDelegate 实现了大部分基于标准 JDBC 的功能接口,但是对于各种数据库来说,需要根据其具体实现的特点做某些特殊处理,因此各种数据库需要扩展 StdJDBCDelegate 以实现这些特殊处理。Quartz 已经自带了一些数据库的扩展实现,可以直接使用,如下图所示:


图 4. Quartz 数据库驱动代理


作为嵌入式数据库的代表,Derby 近来非常流行。如果使用 Derby 数据库,可以使用上图中的 CloudscapeDelegate 作为 trigger 和 job 数据存储的代理类。


2.2 quartz启动流程


若quartz是配置在spring中,当服务器启动时,就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口,因此在初始化bean的时候,会执行afterPropertiesSet方法,该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)创建Scheduler。SchedulerFactory在创建quartzScheduler的过程中,将会读取配置参数,初始化各个组件,关键组件如下:


ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool创建了一定数量的WorkerThread实例来使得Job能够在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类,它实质上就是一个线程。在SimpleThreadPool中有三个list:workers-存放池中所有的线程引用,availWorkers-存放所有空闲的线程,busyWorkers-存放所有工作中的线程;


线程池的配置参数如下所示:


1


2


3


org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount=3 org.quartz.threadPool.threadPriority=5


1


2


JobStore:分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务的管理,而JobStoreTX是自己管理事务),若要使用集群要使用JobStoreSupport的方式;


QuartzSchedulerThread:用来进行任务调度的线程,在初始化的时候paused=true,halted=false,虽然线程开始运行了,但是paused=true,线程会一直等待,直到start方法将paused置为false;


另外,SchedulerFactoryBean还实现了SmartLifeCycle接口,因此初始化完成后,会执行start()方法,该方法将主要会执行以下的几个动作:


创建ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理,将在下文详细讨论;


创建MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理,将在下文详细讨论;


置QuartzSchedulerThread的paused=false,调度线程才真正开始调度;


整个启动流程如下图:


三、Spring + Quartz 实现企业级调度的实现示例


3.1 环境信息


此示例中的环境: Spring 4.1.6.RELEASE + quartz 2.2.1 + Mysql 5.6


3.2 相关代码及配置


3.2.1 Maven 引入


3.2.2 数据库脚本准备


SET FOREIGN_KEY_CHECKS=0;


– —————————-


– Table structure for task_schedule_job


– —————————-


DROP TABLE IF EXISTS task_schedule_job;


CREATE TABLE task_schedule_job (


job_id bigint(20) NOT NULL AUTO_INCREMENT,


create_time timestamp NULL DEFAULT NULL,


update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,


job_name varchar(255) DEFAULT NULL,


job_group varchar(255) DEFAULT NULL,


job_status varchar(255) DEFAULT NULL,


cron_expression varchar(255) NOT NULL,


description varchar(255) DEFAULT NULL,


bean_class varchar(255) DEFAULT NULL,


is_concurrent varchar(255) DEFAULT NULL COMMENT ‘1’,


spring_id varchar(255) DEFAULT NULL,


method_name varchar(255) NOT NULL


PRIMARY KEY (job_id),


UNIQUE KEY name_group (job_name,job_group) USING BTREE


) ENGINE=InnoDB AUTOINCREMENT=1 DEFAULT CHARSET=utf8;


在Quartz包下docs/dbTables,选择对应的数据库脚本,创建相应的数据库表即可,我用的是mysql5.6,这里有一个需要注意的地方,mysql5.5之前用的表存储引擎是MyISAM,使用的是表级锁,锁发生冲突的概率比较高,并发度低;5.6之后默认的存储引擎为InnoDB,InnoDB采用的锁机制是行级锁,并发度也较高。而quartz集群使用数据库锁的


机制来来实现同一个任务在同一个时刻只被实例执行,所以为了防止冲突,我们建表的时候要选取InnoDB作为表的存


储引擎。如下:


3.2.3 关键代码及配置


spring-quartz.xml 配置 在application.xml 文件中引入



xsi:schemaLocation="


http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">




class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy">


quartz.properties 文件配置


#==============================================================


#Configure Main Scheduler Properties


#==============================================================


org.quartz.scheduler.instanceName = KuanrfGSQuartzScheduler


org.quartz.scheduler.instanceId = AUTO


#==============================================================


#Configure JobStore


#==============================================================


org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX


org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate


org.quartz.jobStore.tablePrefix = QRTZ


org.quartz.jobStore.isClustered = true


org.quartz.jobStore.clusterCheckinInterval = 20000


org.quartz.jobStore.dataSource = myDS


org.quartz.jobStore.maxMisfiresToHandleAtATime = 1


org.quartz.jobStore.misfireThreshold = 120000


org.quartz.jobStore.txIsolationLevelSerializable = false


#==============================================================


#Configure DataSource


#==============================================================


org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver


org.quartz.dataSource.myDS.URL = 你的数据链接


org.quartz.dataSource.myDS.user = 用户名


org.quartz.dataSource.myDS.password = 密码


org.quartz.dataSource.myDS.maxConnections = 30


org.quartz.jobStore.selectWithLockSQL = SELECT FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE


#==============================================================


#Configure ThreadPool


#==============================================================


org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool


org.quartz.threadPool.threadCount= 10


org.quartz.threadPool.threadPriority= 5


org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true


#==============================================================


#Skip Check Update


#update:true


#not update:false


#==============================================================


org.quartz.scheduler.skipUpdateCheck = true


#============================================================================


# Configure Plugins


#============================================================================


org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin


org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin


org.quartz.plugin.shutdownhook.cleanShutdown = true


关键代码


package com.netease.ad.omp.service.sys;


import java.util.ArrayList;


import java.util.Date;


import java.util.List;


import java.util.Set;


import javax.annotation.PostConstruct;


import javax.annotation.Resource;


import com.netease.ad.omp.common.utils.SpringUtils;


import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper;


import com.netease.ad.omp.entity.sys.ScheduleJob;


import com.netease.ad.omp.quartz.job.JobUtils;


import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean;


import com.netease.ad.omp.quartz.job.QuartzJobFactory;


import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution;


import org.apache.log4j.Logger;


import org.quartz.CronScheduleBuilder;


import org.quartz.CronTrigger;


import org.quartz.JobBuilder;


import org.quartz.JobDetail;


import org.quartz.JobExecutionContext;


import org.quartz.JobKey;


import org.quartz.Scheduler;


import org.quartz.SchedulerException;


import org.quartz.Trigger;


import org.quartz.TriggerBuilder;


import org.quartz.TriggerKey;


import org.quartz.impl.matchers.GroupMatcher;


import //代码效果参考:http://hnjlyzjd.com/hw/wz_24709.html

org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.quartz.SchedulerFactoryBean;


import org.springframework.stereotype.Service;


/**


计划任务管理


/


@Service


public class JobTaskService {


public final Logger log = Logger.getLogger(this.getClass());


@Autowired


private SchedulerFactoryBean schedulerFactoryBean;


@Autowired


private ScheduleJobMapper scheduleJobMapper;


/**


从数据库中取 区别于getAllJob



@return


/


public List getAllTask() {


return scheduleJobMapper.select(null);


}


/**


添加到数据库中 区别于addJob


/


public void addTask(ScheduleJob job) {


job.setCreateTime(new Date());


scheduleJobMapper.insertSelective(job);


}


/**


从数据库中查询job


/


public ScheduleJob getTaskById(Long jobId) {


return scheduleJobMapper.selectByPrimaryKey(jobId);


}


/**


更改任务状态



@throws SchedulerException


/


public void changeStatus(Long jobId, String cmd) throws SchedulerException {


ScheduleJob job = getTaskById(jobId);


if (job == null) {


return;


}


if ("stop".equals(cmd)) {


deleteJob(job);


job.setJobStatus(JobUtils.STATUS_NOT_RUNNING);


} else if ("start".equals(cmd)) {


job.setJobStatus(JobUtils.STATUS_RUNNING);


addJob(job);


}


scheduleJobMapper.updateByPrimaryKeySelective(job);


}


/**


更改任务 cron表达式



@throws SchedulerException


/


public void updateCron(Long jobId, String cron) throws SchedulerException {


ScheduleJob job = getTaskById(jobId);


if (job == null) {


return;


}


job.setCronExpression(cron);


if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {


updateJobCron(job);


}


scheduleJobMapper.updateByPrimaryKeySelective(job);


}


/**


添加任务



@throws SchedulerException


*/


public void addJob(ScheduleJob job) throws SchedulerException {


if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {


return;


}


Scheduler scheduler = schedulerFactoryBean.getScheduler();


log.debug(scheduler + ".......................................................................................add");


TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());


CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);


// 不存在,创建一个


if (null == trigger) {


Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;


JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();


jobDetail.getJobDataMap().put("scheduleJob", job);


CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());


trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();


scheduler.scheduleJob(jobDetail, trigger);


} else {


// Trigger已存在,那么更新相应的定时设置


CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());


// 按新的cronExpression表达式重新构建trigger


trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();


// 按新的trigger重新设置job执行


scheduler.rescheduleJob(triggerKey, trigger);


}


}


@PostConstruct


public void init() throws Exception {


<span style="c

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
7天前
|
存储 Java 数据库连接
技术好文:quartz基本介绍和使用
技术好文:quartz基本介绍和使用
|
运维 NoSQL 数据库连接
Quartz实战与源码解析
Quartz实战与源码解析
|
开发框架 Java API
【Quartz学习总结】——入门程序
【Quartz学习总结】——入门程序
60 0
【Quartz学习总结】——入门程序
|
Java Spring 容器
Aware&原理---Spring源码从入门到精通(十四)
Aware&原理---Spring源码从入门到精通(十四)
Aware&原理---Spring源码从入门到精通(十四)
|
Java 调度 Spring
Quartz学习笔记(一) 初遇篇
Quartz学习笔记(一) 初遇篇
Quartz学习笔记(一) 初遇篇
Quartz - 入门案例
Quartz - 入门案例
70 0
quartz入门(一)下载及简单实例
quartz入门(一)下载及简单实例
130 0
quartz入门(一)下载及简单实例
quartz入门(二)SimpleTrigger简单实例
quartz入门(二)SimpleTrigger简单实例
193 0

热门文章

最新文章