【优化技术专题】「温故而知新」基于Quartz系列的任务调度框架的动态化任务实现分析

简介: 【优化技术专题】「温故而知新」基于Quartz系列的任务调度框架的动态化任务实现分析

不提XXLJOB或者其他的调度框架,就看我接触的第一个任务调度框架Quartz(温故而知新)



Quartz的动态暂停 恢复 修改和删除任务


实现动态添加定时任务,先来看一下我们初步要实现的目标效果图,这里我们只在内存中操作,并没有把quartz的任何信息保存到数据库,即使用的是RAMJobStore,


image.png



当然如果你有需要,可以实现成JDBCJobStore,那样任务信息将会更全面。


例如,我们要先列出计划中的定时任务以及正在执行中的定时任务,这里的正在执行中指的是任务已经触发线程还没执行完的情况。


  • 比如每天2点执行一个数据导入操作,这个操作执行时间需要5分钟,在这5分钟之内这个任务才是运行中的任务。
  • 当任务正常时可以使用暂停按钮,任务暂停时可以使用恢复按钮。



trigger各状态说明:


  • None:Trigger已经完成且不会在执行,或找不到该触发器,或Trigger已经被删除.
  • NORMAL:正常状态
  • PAUSED:暂停状态
  • COMPLETE:触发器完成,但是任务可能还正在执行中
  • BLOCKED:线程阻塞状态
  • ERROR:出现错误



定时任务运行工厂类


任务运行入口,即Job实现类,在这里我把它看作工厂类:

/**
 * 定时任务运行工厂类
 * 
 * User: liyd
 * Date: 14-1-3
 * Time: 上午10:11
 */
public class QuartzJobFactory implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        System.out.println("任务成功运行");
        ScheduleJob scheduleJob
 = (ScheduleJob)context.getMergedJobDataMap().get("scheduleJob");
        System.out.println("任务名称 = [" + scheduleJob.getJobName() + "]");
    } 
}
复制代码


这里我们实现的是无状态的Job,如果要实现有状态的Job在以前是实现StatefulJob接口,在我使用的quartz 2.2.1中,StatefulJob接口已经不推荐使用了,换成了注解的方式,只需要给你实现的Job类加上注解@DisallowConcurrentExecution即可实现有状态:

/**
* 定时任务运行工厂类
* <p/>
* User: liyd
* Date: 14-1-3
* Time: 上午10:11
*/
@DisallowConcurrentExecution
public class QuartzJobFactory implements Job {...}
复制代码



创建任务


既然要动态的创建任务,我们的任务信息当然要保存在某个地方了,这里我们新建一个保存任务信息对应的实体类:

/**
 * 计划任务信息
 * User: liyd
 * Date: 14-1-3
 * Time: 上午10:24
 */
public class ScheduleJob {
    /** 任务id */
    private String jobId;
    /** 任务名称 */
    private String jobName;
    /** 任务分组 */
    private String jobGroup;
    /** 任务状态 0禁用 1启用 2删除*/
    private String jobStatus;
    /** 任务运行时间表达式 */
    private String cronExpression;
    /** 任务描述 */
    private String desc;
    getter and setter ....
}
复制代码


接下来我们创建测试数据,实际应用中该数据可以保存在数据库等地方,我们把任务的分组名+任务名作为任务的唯一key,和quartz中的实现方式一致:

/** 计划任务map */
private static Map<String, ScheduleJob> jobMap = new HashMap<String, ScheduleJob>();
static {
    for (int i = 0; i < 5; i++) {
        ScheduleJob job = new ScheduleJob();
        job.setJobId("10001" + i);
        job.setJobName("data_import" + i);
        job.setJobGroup("dataWork");
        job.setJobStatus("1");
        job.setCronExpression("0/5 * * * * ?");
        job.setDesc("数据导入任务");
        addJob(job);
    }
}
/**
 * 添加任务
 * @param scheduleJob
 */
public static void addJob(ScheduleJob scheduleJob) {
    jobMap.put(scheduleJob.getJobGroup() + "_" + scheduleJob.getJobName(), scheduleJob);
}
复制代码


有了调度工厂,有了任务运行入口实现类,有了任务信息,接下来就是创建我们的定时任务了,在这里我把它设计成一个Job对应一个trigger,两者的分组及名称相同,方便管理,条理也比较清晰,在创建任务时如果不存在新建一个,如果已经存在则更新任务,主要代码如下



schedulerFactoryBean 由spring创建注入

Scheduler scheduler = schedulerFactoryBean.getScheduler();
//这里获取任务信息数据
List<ScheduleJob> jobList = DataWorkContext.getAllJob();
for (ScheduleJob job : jobList) {
    TriggerKey T = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
    //获取trigger,即在spring配置文件中定义的 bean id="myTrigger"
    CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
    //不存在,创建一个
    if (null == trigger) {
        JobDetail jobDetail = JobBuilder.newJob(QuartzJobFactory.class)
            .withIdentity(job.getJobName(), job.getJobGroup()).build();
        jobDetail.getJobDataMap().put("scheduleJob", job);
        //表达式调度构建器
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job
            .getCronExpression());
        //按新的cronExpression表达式构建一个新的trigger
        trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();
        scheduler.scheduleJob(jobDetail, trigger);
    } else {
        // Trigger已存在,那么更新相应的定时设置
        //表达式调度构建器
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job
            .getCronExpression());
        //按新的cronExpression表达式重新构建trigger
        trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
            .withSchedule(scheduleBuilder).build();
        //按新的trigger重新设置job执行
        scheduler.rescheduleJob(triggerKey, trigger);
    }
}
复制代码


  • 如此,可以说已经完成了我们的动态任务创建,大功告成了。有了上面的代码,添加和修改任务是不是也会了,顺道解决了?
  • 上面我们创建的5个测试任务,都是5秒执行一次,都将调用QuartzJobFactory的execute方法,但是传入的任务信息参数不同,execute方法中的如下代码就是得到具体的任务信息,包括任务分组和任务名:
ScheduleJob scheduleJob = (ScheduleJob)context.getMergedJobDataMap().get("scheduleJob");
复制代码


  • 有了任务分组和任务名即确定了该任务的唯一性,接下来需要什么操作实现起来是不是就很容易了?
  • 以后需要添加新的定时任务只需要在任务信息列表中加入记录即可,然后在execute方法中通过判断任务分组和任务名来实现你具体的操作。
  • 以上已经初始实现了我们需要的功能,增加和修改也已经可以通过源代码举一反三出来,但是我们在实际开发的时候需要进行测试,如果一个任务是1个小时运行一次的,测试起来是不是很不方便?当然你可以修改任务的运行时间表达式,但相信这不是最好的方法,接下来我们就要实现在不对当前任务信息做任何修改的情况下触发任务,并且该触发只会运行一次作测试用。




计划中的任务


**主要是已经添加到quartz调度器的任务,因为quartz并没有直接提供这样的查询接口,所以我们需要结合JobKey和Trigger来实现,核心代码: **

Scheduler scheduler = schedulerFactoryBean.getScheduler(); 
  GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup(); 
  Set<JobKey> jobKeys = scheduler.getJobKeys(matcher); 
  List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(); 
  for (JobKey jobKey : jobKeys) { 
    List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); 
    for (Trigger trigger : triggers) { 
      ScheduleJob job = new ScheduleJob(); 
      job.setJobName(jobKey.getName()); 
      job.setJobGroup(jobKey.getGroup()); 
      job.setDesc("触发器:" + trigger.getKey()); 
      Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); 
      job.setJobStatus(triggerState.name()); 
      if (trigger instanceof CronTrigger) { 
         CronTrigger cronTrigger = (CronTrigger) trigger; 
        String cronExpression = cronTrigger.getCronExpression(); 
        job.setCronExpression(cronExpression); 
      } 
       jobList.add(job); 
  } 
}  
复制代码
  • jobList就是我们需要的计划中的任务列表,需要注意一个job可能会有多个trigger的情况,在下面讲到的立即运行一次任务的时候,会生成一个临时的trigger也会出现在这。
  • 这里把一个Job有多个trigger的情况看成是多个任务。包括在实际项目中一般用到的都是CronTrigger ,所以这里我们着重处理了下CronTrigger的情况。




运行中的任务


实现和计划中的任务类似,核心代码:


Scheduler scheduler = schedulerFactoryBean.getScheduler(); 
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs(); 
List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size()); 
for (JobExecutionContext executingJob : executingJobs) { 
   ScheduleJob job = new ScheduleJob(); 
   JobDetail jobDetail = executingJob.getJobDetail(); 
   JobKey jobKey = jobDetail.getKey(); 
   Trigger trigger = executingJob.getTrigger(); 
   job.setJobName(jobKey.getName()); 
   job.setJobGroup(jobKey.getGroup()); 
   job.setDesc("触发器:" + trigger.getKey()); 
   Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); 
   job.setJobStatus(triggerState.name()); 
   if (trigger instanceof CronTrigger) { 
     CronTrigger cronTrigger = (CronTrigger) trigger; 
     String cronExpression = cronTrigger.getCronExpression(); 
     job.setCronExpression(cronExpression); 
   } 
   jobList.add(job); 
}  
复制代码



暂停任务机制


比较简单,核心代码:

Scheduler scheduler = schedulerFactoryBean.getScheduler(); 
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); 
scheduler.pauseJob(jobKey); 
复制代码



恢复任务


暂停任务相对,核心代码:

Scheduler scheduler = schedulerFactoryBean.getScheduler(); 
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); 
scheduler.resumeJob(jobKey); 
复制代码



删除任务


删除任务后,所对应的trigger也将被删除

Scheduler scheduler = schedulerFactoryBean.getScheduler(); 
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); 
scheduler.deleteJob(jobKey);  
复制代码



立即运行任务


  • 这里的立即运行,只会运行一次,方便测试时用。quartz是通过临时生成一个trigger的方式来实现的,这个trigger将在本次任务运行完成之后自动删除。
  • trigger的key是随机生成的,例如:DEFAULT.MT_4k9fd10jcn9mg。
  • 在我的测试中,前面的DEFAULT.MT是固定的,后面部分才随机生成。
Scheduler scheduler = schedulerFactoryBean.getScheduler(); 
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); 
scheduler.triggerJob(jobKey);  
复制代码



更新任务的时间表达式


更新之后,任务将立即按新的时间表达式执行:

Scheduler scheduler = schedulerFactoryBean.getScheduler(); 
TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), 
 scheduleJob.getJobGroup()); 
//获取trigger,即在spring配置文件中定义的 bean id="myTrigger" 
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); 
//表达式调度构建器 
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob 
 .getCronExpression()); 
//按新的cronExpression表达式重新构建trigger 
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey) 
 .withSchedule(scheduleBuilder).build(); 
//按新的trigger重新设置job执行 
scheduler.rescheduleJob(triggerKey, trigger);  
复制代码

cronExpression表达式:


  • 字段 允许值 允许的特殊字符
  • 秒         0-59       , - * /
  • 分         0-59       , - * /
  • 小时   0-23    , - * /
  • 日期   1-31     , - * ? / L W C
  • 月份   1-12 或者 JAN-DEC   , - * /
  • 星期   1-7 或者 SUN-SAT   , - * ? / L C #
  • 年(可选)   留空, 1970-2099   , - * /
  • 特殊字符 意义
  • *   表示所有值;
  • ?   表示未说明的值,即不关心它为何值;
  • -   表示一个指定的范围;
  • ,   表示附加一个可能值;
  • /   符号前表示开始时间,符号后表示每次递增的值;
  • L W C
  • L("last")   ("last") "L" 用在day-of-month字段意思是 "这个月最后一天";用在 day-of-week字段, 它简单意思是 "7" or "SAT"。如果在day-of-week字段里和数字联合使用,它的意思就是 "这个月的最后一个星期几"
  • 例如: "6L" means "这个月的最后一个星期五". 当我们用“L”时,不指明一个列表值或者范围是很重要的,不然的话,我们会得到一些意想不到的结果。


  • W("weekday")   只能用在day-of-month字段。用来描叙最接近指定天的工作日(周一到周五)。
  • 例如:在day-of-month字段用“15W”指“最接近这个月第15天的工作日”,即如果这个月第15天是周六,那么触发器将会在这个月第14天即周五触发;如果这个月第15天是周日,那么触发器将会在这个月第16 天即周一触发;如果这个月第15天是周二,那么就在触发器这天触发。
  • 注意一点:这个用法只会在当前月计算值,不会越过当前月。“W”字符仅能在day- of-month指明一天,不能是一个范围或列表。也可以用“LW”来指定这个月的最后一个工作日。
  • # 只能用在day-of-week字段。用来指定这个月的第几个周几。例:在day-of-week字段用"6#3"指这个月第3个周五(6指周五,3指第3个)。如果指定的日期不存在,触发器就不会触发。
  • C   指和calendar联系后计算过的值。
  • 例:在day-of-month 字段用“5C”指在这个月第5天或之后包括calendar的第一天;在day-of-week字段用“1C”指在这周日或之后包括calendar的第一天。

- 星期的简写: - 周一 MON - 周二 TUE - 周三 WED - 周四 THU - 周五 FRI - 周六 SAT - 周日 SUN




在MONTH和Day Of Week字段里对字母大小写不敏感


  • 表达式 意义

  • 每天中午12点触发
  • "0 0 12 * * ?"
  • 每天上午10:15触发
  • "0 15 10 ? * *"
  • "0 15 10 * * ?"
  • "0 15 10 * * ? *" (此处最后一项 年是可选的)
  • 2005年的每天上午10:15触发
  • "0 15 10 * * ? 2005"
  • 每天下午2点到下午2:59期间的每1分钟触发
  • "0 * 14 * * ?"
  • 每天下午2点到下午2:55期间的每5分钟触发
  • "0 0/5 14 * * ?"
  • 每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
  • "0 0/5 14,18 * * ?"
  • 每天下午2点到下午2:05期间的每1分钟触发
  • "0 0-5 14 * * ?"  
  • 每年三月的星期三的下午2:10和2:44触发
  • "0 10,44 14 ? 3 WED" / "0 10,44 14 ? 3 WED * "
  • 周一至周五的上午10:15触发
  • "0 15 10 ? * MON-FRI"   / "0 15 10 ? * MON-FRI * "
  • 每月15日上午10:15触发
  • "0 15 10 15 * ?"
  • 每月最后一日的上午10:15触发
  • "0 15 10 L * ?"  
  • 每月的最后一个星期五上午10:15触发
  • "0 15 10 ? * 6L"  
  • 2002年至2005年的每月的最后一个星期五上午10:15触发
  • "0 15 10 ? * 6L 2002-2005"  
  • 每月的第三个星期五上午10:15触发
  • "0 15 10 ? * 6#3"  
  • 每两个小时
  • 0 */2 * * *




相关文章
|
28天前
|
NoSQL Java Redis
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件(二)
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件
15 0
|
6月前
|
XML 存储 Java
万字+40张图带你探秘小而美的规则引擎框架LiteFlow
在每个公司的系统中,总有一些拥有复杂业务逻辑的系统,这些系统承载着核心业务逻辑,几乎每个需求都和这些核心业务有关,这些核心业务业务逻辑冗长,涉及内部逻辑运算,缓存操作,持久化操作,外部资源调取,内部其他系统RPC调用等等。时间一长,项目几经易手,维护的成本就会越来越高。各种硬代码判断,分支条件越来越多。代码的抽象,复用率也越来越低,各个模块之间的耦合度很高。一小段逻辑的变动,会影响到其他模块,需要进行完整回归测试来验证。如要灵活改变业务流程的顺序,则要进行代码大改动进行抽象,重新写方法。实时热变更业务流程,几乎很难实现。
|
6月前
|
监控 Java 测试技术
103分布式电商项目 - JVM调优(实战篇)
103分布式电商项目 - JVM调优(实战篇)
40 0
|
6月前
|
监控 Dubbo Java
分布式定时任务调度框架实践
分布式定时任务调度框架实践
583 1
|
6月前
|
SQL 分布式计算 算法
【大数据处理框架】Spark大数据处理框架,包括其底层原理、架构、编程模型、生态圈
【大数据处理框架】Spark大数据处理框架,包括其底层原理、架构、编程模型、生态圈
239 0
|
9月前
|
负载均衡
如何手写一个简单的分布式系统框架?
如何手写一个简单的分布式系统框架?
89 0
|
10月前
|
设计模式 缓存 Java
好家伙!阿里新产Java性能优化(终极版),涵盖性能优化所有操作
上月公司来了一位大佬,入职不到一周就把公司现有项目的性能优化了一遍,直接给公司节省了一半的成本。 一问情况,才知道这位仁兄也是一路被虐过来的。去年年底被裁,本以为自己技术还行,看了一段时间面经,复习了基础知识,就开始投大厂简历。阿里最先给他面试机会,结果没能扛过三面,然后是各种大大小小的公司,在实际面试中被碾压得翻不了身。整整一个半月,一个offer都没拿到,最后针对性的恶补,才入职了我司。
|
12月前
|
消息中间件 监控 算法
分布式定时任务框架选型,写的太好了 !
分布式定时任务框架选型,写的太好了 !
|
调度 数据库
任务调度 Quartzh 框架企业级实战案例
任务调度 Quartzh 框架企业级实战案例
|
运维 NoSQL 数据库连接
定时任务能力进击!Quartz框架的使用
定时任务能力进击!Quartz框架的使用