一、背景
最近公司有一个业务场景,需要手动推送消息给客服,但是这个执行时间可以是立即执行、未来执行一次或者重复执行推送任务。还有一个就是,如果未来执行一次和重复执行的开始时间晚于当前时间,这个任务是可以修改的。还有就是随时停止和启用任务。于是和这个quartz不谋而合。预研一下,以后实现真正的业务时候轻松一点。
二、代码实现
配置文件沿用quartz(一)基础篇,是最简单的配置,以后会逐步加深的。
公共部分代码如下:
- 任务job
import com.example.docker_images.entity.common.CwConstant; import lombok.extern.slf4j.Slf4j; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @Slf4j public class HelloJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { //通过jobDetail获取数据 String payload = context.getTrigger().getJobDataMap().getString(CwConstant.PAYLOAD); System.out.println(payload); System.out.println("hello world!!!"); } }
- jobPayload类
import com.google.gson.Gson; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import org.quartz.CronTrigger; @Data @ApiModel("job类") public class JobPayload { private String name; private String group; @ApiModelProperty(name = "任务类型",allowableValues = "now,once,repeat") private String scheduleType; private String cron; private String desc; private String dataMap; public static JobPayload getInstance(CronTrigger trigger){ JobPayload jobPayload = new JobPayload(); jobPayload.setName(trigger.getKey().getName()); jobPayload.setGroup(trigger.getKey().getGroup()); Gson gson = new Gson(); jobPayload.setDataMap(gson.toJson(trigger.getJobDataMap())); jobPayload.setDesc(trigger.getDescription()); jobPayload.setCron(trigger.getCronExpression()); return jobPayload; } }
1. 新增job
- controller层
@PostMapping("/initiate") public APIResult initiate(@RequestBody JobPayload payload){ return jobService.initiate(payload); }
- service层
APIResult initiate(JobPayload payload);
- serviceImpl层
@Override public APIResult initiate(JobPayload payload) { try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); JobDetail jobDetail = JobBuilder.newJob(HelloJob.class).withIdentity(payload.getName(), payload.getGroup()).build(); switch (payload.getScheduleType()){ case CwConstant.JobType.NOW: Trigger build = newTrigger().withIdentity(payload.getName(), payload.getGroup()) .startNow().usingJobData(CwConstant.PAYLOAD,payload.getDataMap()).withDescription(payload.getDesc()).build(); scheduler.scheduleJob(jobDetail,build); scheduler.start(); log.info("成功创建即刻执行任务!!!"); break; case CwConstant.JobType.ONCE: Trigger trigger = newTrigger().withIdentity(payload.getName(), payload.getGroup()) .startAt(DateUtil.parseYYYYMMDD(payload.getCron())) .usingJobData(CwConstant.PAYLOAD,payload.getDataMap()) .withDescription(payload.getDesc()) .build(); scheduler.scheduleJob(jobDetail,trigger); scheduler.start(); log.info("成功创建未来执行一次任务!!!"); break; case CwConstant.JobType.REPEAT: CronTrigger cronTrigger = newTrigger().withIdentity(payload.getName(), payload.getGroup()) .usingJobData(CwConstant.PAYLOAD,payload.getDataMap()).withDescription(payload.getDesc()) .withSchedule(cronSchedule(payload.getCron())).build(); scheduler.scheduleJob(jobDetail,cronTrigger); scheduler.start(); log.info("成功创建重复任务!!!"); break; } } catch (Exception e) { log.info("job initial failure:{}",e); return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE); } return APIResult.success(null); }
2. 删除job
- controller层
@DeleteMapping("/delete") public APIResult delete(@RequestParam String name,@RequestParam String groupName){ return jobService.delete(name,groupName); }
- service层
- serviceImpl层
@Override public APIResult delete(String name, String groupName) { try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); TriggerKey triggerKey = new TriggerKey(name, groupName); if(scheduler.checkExists(triggerKey)){ scheduler.pauseTrigger(triggerKey); scheduler.unscheduleJob(triggerKey); scheduler.deleteJob(new JobKey(name,groupName)); } }catch (Exception e){ log.info("删除任务失败:{}",e); return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE); } return APIResult.success(null); }
2. 修改job
- controller层
@PutMapping("/update") public APIResult update(@RequestBody JobPayload jobPayload){ return jobService.update(jobPayload); }
- service层
APIResult update(JobPayload jobPayload);
- serviceImpl层
@Override public APIResult update(JobPayload jobPayload) { try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); TriggerKey triggerKey = new TriggerKey(jobPayload.getName(), jobPayload.getGroup()); switch (jobPayload.getScheduleType()){ case CwConstant.JobType.ONCE: //获取trigger,修改执行频率 Trigger trigger = scheduler.getTrigger(triggerKey); Date startTime = trigger.getStartTime(); Date date = DateUtil.parseYYYYMMDD(jobPayload.getCron()); boolean equals = date.equals(startTime); if(!equals){ Trigger build = newTrigger().withIdentity(triggerKey).startAt(date) .withDescription(jobPayload.getDesc()) .usingJobData(CwConstant.PAYLOAD,jobPayload.getDataMap()) .build(); scheduler.rescheduleJob(triggerKey,build); } case CwConstant.JobType.REPEAT: CronTrigger cronTrigger = (CronTrigger)scheduler.getTrigger(triggerKey); String cronExpression = cronTrigger.getCronExpression(); if(!ObjectUtils.nullSafeEquals(jobPayload.getCron(),cronExpression)){ CronTrigger build = newTrigger().withIdentity(triggerKey) .withDescription(jobPayload.getDesc()) .usingJobData(CwConstant.PAYLOAD,jobPayload.getDataMap()) .withSchedule(cronSchedule(jobPayload.getCron())) .build(); scheduler.rescheduleJob(triggerKey,build); } } }catch (Exception e){ log.info("修改失败:{}",e); return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE); } return APIResult.success(null); }
2. 查询job
- controller层
@GetMapping("/list") public APIResult getJobList(@RequestParam(required = false) String name,@RequestParam String groupName){ return jobService.getJobList(name,groupName); }
- service层
APIResult getJobList(String name, String groupName);
- serviceImpl层
@Override public APIResult getJobList(String name, String groupName) { List<JobPayload> jobs = new ArrayList<>(); try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); Set<JobKey> jobKeys; if(StringUtils.isEmpty(groupName)){ if(!StringUtils.isEmpty(name)){ CronTrigger trigger = (CronTrigger)scheduler.getTrigger(new TriggerKey(name, groupName)); jobs.add(JobPayload.getInstance(trigger)); return APIResult.success(jobs); } jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup()); }else { jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)); } jobKeys.forEach(jobKey -> { try { CronTrigger trigger = (CronTrigger)scheduler.getTrigger(new TriggerKey(jobKey.getName(), jobKey.getGroup())); JobPayload instance = JobPayload.getInstance(trigger); jobs.add(instance); } catch (SchedulerException e) { log.info("任务不存在:{}",e); } }); }catch (Exception e){ log.info("任务列表获取失败{}",e); return APIResult.success(jobs); } return APIResult.success(jobs); }
2. 停止和启用job
- controller层
@GetMapping("/pauseJob") public APIResult pauseJob(@RequestParam String name,@RequestParam String groupName){ return jobService.pauseJob(name,groupName); } @GetMapping("/resumeJob") public APIResult resumeJob(@RequestParam String name,@RequestParam String groupName){ return jobService.resumeJob(name,groupName); }
- service层
APIResult pauseJob(String name, String groupName); APIResult resumeJob(String name, String groupName);
- serviceImpl层
@Override public APIResult pauseJob(String name, String groupName) { try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.pauseTrigger(new TriggerKey(name, groupName)); }catch (Exception e){ log.info("暂停任务失败:{}",e); return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE); } return APIResult.success(null); } @Override public APIResult resumeJob(String name, String groupName) { try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.resumeTrigger(new TriggerKey(name, groupName)); }catch (Exception e){ log.info("恢复任务失败:{}",e); return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE); } return APIResult.success(null); }
三、测试结果
2020-12-27 20:53:01.877 INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory : Using default implementation for ThreadExecutor 2020-12-27 20:53:01.884 INFO 5076 --- [nio-8088-exec-2] org.quartz.core.SchedulerSignalerImpl : Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl 2020-12-27 20:53:01.885 INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler : Quartz Scheduler v.2.2.1 created. 2020-12-27 20:53:01.885 INFO 5076 --- [nio-8088-exec-2] org.quartz.simpl.RAMJobStore : RAMJobStore initialized. 2020-12-27 20:53:01.886 INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler : Scheduler meta-data: Quartz Scheduler (v2.2.1) 'QuartzScheduler' with instanceId 'NON_CLUSTERED' Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally. NOT STARTED. Currently in standby mode. Number of jobs executed: 0 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 3 threads. Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered. 2020-12-27 20:53:01.886 INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory : Quartz scheduler 'QuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties' 2020-12-27 20:53:01.886 INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory : Quartz scheduler version: 2.2.1 2020-12-27 20:53:01.892 INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler : Scheduler QuartzScheduler_$_NON_CLUSTERED started. 2020-12-27 20:53:01.892 INFO 5076 --- [nio-8088-exec-2] c.e.d.service.impl.JobServiceImpl : 成功创建重复任务!!! {name:'ljl'} hello world!!! {name:'ljl'} hello world!!! {name:'ljl'} hello world!!! {name:'lsr'} hello world!!! {name:'lsr'} hello world!!!