quartz(二)动态增删改查停止启用job

简介: quartz(二)动态增删改查停止启用job

一、背景



最近公司有一个业务场景,需要手动推送消息给客服,但是这个执行时间可以是立即执行、未来执行一次或者重复执行推送任务。还有一个就是,如果未来执行一次和重复执行的开始时间晚于当前时间,这个任务是可以修改的。还有就是随时停止和启用任务。于是和这个quartz不谋而合。预研一下,以后实现真正的业务时候轻松一点。


二、代码实现



配置文件沿用quartz(一)基础篇,是最简单的配置,以后会逐步加深的。

公共部分代码如下:


  1. 任务job
import com.example.docker_images.entity.common.CwConstant;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@Slf4j
public class HelloJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        //通过jobDetail获取数据
        String payload = context.getTrigger().getJobDataMap().getString(CwConstant.PAYLOAD);
        System.out.println(payload);
        System.out.println("hello world!!!");
    }
}


  1. jobPayload类
import com.google.gson.Gson;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.quartz.CronTrigger;
@Data
@ApiModel("job类")
public class JobPayload {
    private String name;
    private String group;
    @ApiModelProperty(name = "任务类型",allowableValues = "now,once,repeat")
    private String scheduleType;
    private String cron;
    private String desc;
    private String dataMap;
    public static JobPayload getInstance(CronTrigger trigger){
        JobPayload jobPayload = new JobPayload();
        jobPayload.setName(trigger.getKey().getName());
        jobPayload.setGroup(trigger.getKey().getGroup());
        Gson gson = new Gson();
        jobPayload.setDataMap(gson.toJson(trigger.getJobDataMap()));
        jobPayload.setDesc(trigger.getDescription());
        jobPayload.setCron(trigger.getCronExpression());
        return jobPayload;
    }
}


1. 新增job


  1. controller层
@PostMapping("/initiate")
    public APIResult initiate(@RequestBody JobPayload payload){
        return jobService.initiate(payload);
    }


  1. service层
APIResult initiate(JobPayload payload);


  1. serviceImpl层
@Override
    public APIResult initiate(JobPayload payload) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            JobDetail jobDetail = JobBuilder.newJob(HelloJob.class).withIdentity(payload.getName(), payload.getGroup()).build();
            switch (payload.getScheduleType()){
                case CwConstant.JobType.NOW:
                    Trigger build = newTrigger().withIdentity(payload.getName(), payload.getGroup())
                            .startNow().usingJobData(CwConstant.PAYLOAD,payload.getDataMap()).withDescription(payload.getDesc()).build();
                    scheduler.scheduleJob(jobDetail,build);
                    scheduler.start();
                    log.info("成功创建即刻执行任务!!!");
                    break;
                case CwConstant.JobType.ONCE:
                    Trigger trigger = newTrigger().withIdentity(payload.getName(), payload.getGroup())
                            .startAt(DateUtil.parseYYYYMMDD(payload.getCron()))
                            .usingJobData(CwConstant.PAYLOAD,payload.getDataMap())
                            .withDescription(payload.getDesc())
                            .build();
                    scheduler.scheduleJob(jobDetail,trigger);
                    scheduler.start();
                    log.info("成功创建未来执行一次任务!!!");
                    break;
                case CwConstant.JobType.REPEAT:
                    CronTrigger cronTrigger = newTrigger().withIdentity(payload.getName(), payload.getGroup())
                            .usingJobData(CwConstant.PAYLOAD,payload.getDataMap()).withDescription(payload.getDesc())
                            .withSchedule(cronSchedule(payload.getCron())).build();
                    scheduler.scheduleJob(jobDetail,cronTrigger);
                    scheduler.start();
                    log.info("成功创建重复任务!!!");
                    break;
            }
        } catch (Exception e) {
            log.info("job initial failure:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


2. 删除job


  1. controller层
@DeleteMapping("/delete")
    public APIResult delete(@RequestParam String name,@RequestParam String groupName){
        return jobService.delete(name,groupName);
    }


  1. service层

         


  1. serviceImpl层
@Override
    public APIResult delete(String name, String groupName) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            TriggerKey triggerKey = new TriggerKey(name, groupName);
            if(scheduler.checkExists(triggerKey)){
                scheduler.pauseTrigger(triggerKey);
                scheduler.unscheduleJob(triggerKey);
                scheduler.deleteJob(new JobKey(name,groupName));
            }
        }catch (Exception e){
            log.info("删除任务失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


2. 修改job


  1. controller层
@PutMapping("/update")
    public APIResult update(@RequestBody JobPayload jobPayload){
        return jobService.update(jobPayload);
    }


  1. service层
APIResult update(JobPayload jobPayload);


  1. serviceImpl层
@Override
    public APIResult update(JobPayload jobPayload) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            TriggerKey triggerKey = new TriggerKey(jobPayload.getName(), jobPayload.getGroup());
            switch (jobPayload.getScheduleType()){
                case CwConstant.JobType.ONCE:
                    //获取trigger,修改执行频率
                    Trigger trigger = scheduler.getTrigger(triggerKey);
                    Date startTime = trigger.getStartTime();
                    Date date = DateUtil.parseYYYYMMDD(jobPayload.getCron());
                    boolean equals = date.equals(startTime);
                    if(!equals){
                        Trigger build = newTrigger().withIdentity(triggerKey).startAt(date)
                                .withDescription(jobPayload.getDesc())
                                .usingJobData(CwConstant.PAYLOAD,jobPayload.getDataMap())
                                .build();
                        scheduler.rescheduleJob(triggerKey,build);
                    }
                case CwConstant.JobType.REPEAT:
                    CronTrigger cronTrigger = (CronTrigger)scheduler.getTrigger(triggerKey);
                    String cronExpression = cronTrigger.getCronExpression();
                    if(!ObjectUtils.nullSafeEquals(jobPayload.getCron(),cronExpression)){
                        CronTrigger build = newTrigger().withIdentity(triggerKey)
                                .withDescription(jobPayload.getDesc())
                                .usingJobData(CwConstant.PAYLOAD,jobPayload.getDataMap())
                                .withSchedule(cronSchedule(jobPayload.getCron()))
                                .build();
                        scheduler.rescheduleJob(triggerKey,build);
                    }
            }
        }catch (Exception e){
            log.info("修改失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


2. 查询job


  1. controller层
@GetMapping("/list")
    public APIResult getJobList(@RequestParam(required = false) String name,@RequestParam String groupName){
        return jobService.getJobList(name,groupName);
    }


  1. service层
APIResult getJobList(String name, String groupName);


  1. serviceImpl层
@Override
    public APIResult getJobList(String name, String groupName) {
        List<JobPayload> jobs = new ArrayList<>();
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            Set<JobKey> jobKeys;
            if(StringUtils.isEmpty(groupName)){
                if(!StringUtils.isEmpty(name)){
                    CronTrigger trigger = (CronTrigger)scheduler.getTrigger(new TriggerKey(name, groupName));
                    jobs.add(JobPayload.getInstance(trigger));
                    return APIResult.success(jobs);
                }
                jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup());
            }else {
                jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName));
            }
            jobKeys.forEach(jobKey -> {
                try {
                    CronTrigger trigger = (CronTrigger)scheduler.getTrigger(new TriggerKey(jobKey.getName(), jobKey.getGroup()));
                    JobPayload instance = JobPayload.getInstance(trigger);
                    jobs.add(instance);
                } catch (SchedulerException e) {
                    log.info("任务不存在:{}",e);
                }
            });
        }catch (Exception e){
            log.info("任务列表获取失败{}",e);
            return APIResult.success(jobs);
        }
        return APIResult.success(jobs);
    }


2. 停止和启用job


  1. controller层
@GetMapping("/pauseJob")
    public APIResult pauseJob(@RequestParam String name,@RequestParam String groupName){
        return jobService.pauseJob(name,groupName);
    }
    @GetMapping("/resumeJob")
    public APIResult resumeJob(@RequestParam String name,@RequestParam String groupName){
        return jobService.resumeJob(name,groupName);
    }


  1. service层
APIResult pauseJob(String name, String groupName);
    APIResult resumeJob(String name, String groupName);


  1. serviceImpl层
@Override
    public APIResult pauseJob(String name, String groupName) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.pauseTrigger(new TriggerKey(name, groupName));
        }catch (Exception e){
            log.info("暂停任务失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }
    @Override
    public APIResult resumeJob(String name, String groupName) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.resumeTrigger(new TriggerKey(name, groupName));
        }catch (Exception e){
            log.info("恢复任务失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


三、测试结果



2020-12-27 20:53:01.877  INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory      : Using default implementation for ThreadExecutor
2020-12-27 20:53:01.884  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.SchedulerSignalerImpl    : Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2020-12-27 20:53:01.885  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler          : Quartz Scheduler v.2.2.1 created.
2020-12-27 20:53:01.885  INFO 5076 --- [nio-8088-exec-2] org.quartz.simpl.RAMJobStore             : RAMJobStore initialized.
2020-12-27 20:53:01.886  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler          : Scheduler meta-data: Quartz Scheduler (v2.2.1) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 3 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
2020-12-27 20:53:01.886  INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'QuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
2020-12-27 20:53:01.886  INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.2.1
2020-12-27 20:53:01.892  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler          : Scheduler QuartzScheduler_$_NON_CLUSTERED started.
2020-12-27 20:53:01.892  INFO 5076 --- [nio-8088-exec-2] c.e.d.service.impl.JobServiceImpl        : 成功创建重复任务!!!
{name:'ljl'}
hello world!!!
{name:'ljl'}
hello world!!!
{name:'ljl'}
hello world!!!
{name:'lsr'}
hello world!!!
{name:'lsr'}
hello world!!!


目录
相关文章
|
4月前
|
SQL Java 数据库连接
java链接hive数据库实现增删改查操作
java链接hive数据库实现增删改查操作
147 0
|
4月前
|
SQL 数据库 索引
gorm普通的增删改查
gorm普通的增删改查
29 0
|
4月前
|
存储 BI 数据库
PowerApps教程-实现简单的增删改查
PowerApps是Microsoft提供的低代码开发平台,允许用户无需编写大量代码,通过直观的界面设计快速创建应用程序。通过PowerApps的数据连接功能,系统可以轻松地与其他Microsoft 365服务(如SharePoint、Excel)进行集成,实现数据的无缝交互。本文详细介绍了如何使用PowerApps快速开发一个支持增删改查的报表页面,采用SharePoint上的List作为数据源。
70 0
|
3月前
|
关系型数据库 MySQL 数据库
|
4月前
|
SQL 关系型数据库 MySQL
MySQL | 数据库的管理和操作【表的增删改查】(一)
MySQL | 数据库的管理和操作【表的增删改查】
|
4月前
|
SQL 关系型数据库 MySQL
MySQL | 数据库的管理和操作【表的增删改查】(二)
MySQL | 数据库的管理和操作【表的增删改查】(二)
|
1天前
|
API 数据库 Python
Python web框架fastapi数据库操作ORM(二)增删改查逻辑实现方法
Python web框架fastapi数据库操作ORM(二)增删改查逻辑实现方法
|
15天前
|
安全 数据库连接 数据库
Flask数据库操作实战:增删改查一网打尽
【4月更文挑战第15天】本文介绍了在Flask中进行数据库操作的方法,包括选择数据库扩展(如Flask-SQLAlchemy)、配置数据库、定义模型以及执行CRUD操作。通过Flask-SQLAlchemy的ORM功能,开发者可以方便地管理数据库表和记录。文章详细展示了如何创建模型、添加、查询、更新和删除数据,并提到了高级查询和关系映射。此外,还提及了数据库迁移工具Flask-Migrate以及性能优化和安全性问题。了解这些基础,有助于开发者构建高效、安全的Flask Web应用。
|
2月前
|
SQL 数据库连接 API
python链接数据库,实现数据增删改查
python链接数据库,实现数据增删改查
25 7
|
4月前
|
SQL 关系型数据库 MySQL
MySQL | 数据库的表的增删改查【进阶】【万字详解】(二)
MySQL | 数据库的表的增删改查【进阶】【万字详解】(二)