quartz(二)动态增删改查停止启用job

简介: quartz(二)动态增删改查停止启用job

一、背景



最近公司有一个业务场景,需要手动推送消息给客服,但是这个执行时间可以是立即执行、未来执行一次或者重复执行推送任务。还有一个就是,如果未来执行一次和重复执行的开始时间晚于当前时间,这个任务是可以修改的。还有就是随时停止和启用任务。于是和这个quartz不谋而合。预研一下,以后实现真正的业务时候轻松一点。


二、代码实现



配置文件沿用quartz(一)基础篇,是最简单的配置,以后会逐步加深的。

公共部分代码如下:


  1. 任务job
import com.example.docker_images.entity.common.CwConstant;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@Slf4j
public class HelloJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        //通过jobDetail获取数据
        String payload = context.getTrigger().getJobDataMap().getString(CwConstant.PAYLOAD);
        System.out.println(payload);
        System.out.println("hello world!!!");
    }
}


  1. jobPayload类
import com.google.gson.Gson;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.quartz.CronTrigger;
@Data
@ApiModel("job类")
public class JobPayload {
    private String name;
    private String group;
    @ApiModelProperty(name = "任务类型",allowableValues = "now,once,repeat")
    private String scheduleType;
    private String cron;
    private String desc;
    private String dataMap;
    public static JobPayload getInstance(CronTrigger trigger){
        JobPayload jobPayload = new JobPayload();
        jobPayload.setName(trigger.getKey().getName());
        jobPayload.setGroup(trigger.getKey().getGroup());
        Gson gson = new Gson();
        jobPayload.setDataMap(gson.toJson(trigger.getJobDataMap()));
        jobPayload.setDesc(trigger.getDescription());
        jobPayload.setCron(trigger.getCronExpression());
        return jobPayload;
    }
}


1. 新增job


  1. controller层
@PostMapping("/initiate")
    public APIResult initiate(@RequestBody JobPayload payload){
        return jobService.initiate(payload);
    }


  1. service层
APIResult initiate(JobPayload payload);


  1. serviceImpl层
@Override
    public APIResult initiate(JobPayload payload) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            JobDetail jobDetail = JobBuilder.newJob(HelloJob.class).withIdentity(payload.getName(), payload.getGroup()).build();
            switch (payload.getScheduleType()){
                case CwConstant.JobType.NOW:
                    Trigger build = newTrigger().withIdentity(payload.getName(), payload.getGroup())
                            .startNow().usingJobData(CwConstant.PAYLOAD,payload.getDataMap()).withDescription(payload.getDesc()).build();
                    scheduler.scheduleJob(jobDetail,build);
                    scheduler.start();
                    log.info("成功创建即刻执行任务!!!");
                    break;
                case CwConstant.JobType.ONCE:
                    Trigger trigger = newTrigger().withIdentity(payload.getName(), payload.getGroup())
                            .startAt(DateUtil.parseYYYYMMDD(payload.getCron()))
                            .usingJobData(CwConstant.PAYLOAD,payload.getDataMap())
                            .withDescription(payload.getDesc())
                            .build();
                    scheduler.scheduleJob(jobDetail,trigger);
                    scheduler.start();
                    log.info("成功创建未来执行一次任务!!!");
                    break;
                case CwConstant.JobType.REPEAT:
                    CronTrigger cronTrigger = newTrigger().withIdentity(payload.getName(), payload.getGroup())
                            .usingJobData(CwConstant.PAYLOAD,payload.getDataMap()).withDescription(payload.getDesc())
                            .withSchedule(cronSchedule(payload.getCron())).build();
                    scheduler.scheduleJob(jobDetail,cronTrigger);
                    scheduler.start();
                    log.info("成功创建重复任务!!!");
                    break;
            }
        } catch (Exception e) {
            log.info("job initial failure:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


2. 删除job


  1. controller层
@DeleteMapping("/delete")
    public APIResult delete(@RequestParam String name,@RequestParam String groupName){
        return jobService.delete(name,groupName);
    }


  1. service层

         


  1. serviceImpl层
@Override
    public APIResult delete(String name, String groupName) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            TriggerKey triggerKey = new TriggerKey(name, groupName);
            if(scheduler.checkExists(triggerKey)){
                scheduler.pauseTrigger(triggerKey);
                scheduler.unscheduleJob(triggerKey);
                scheduler.deleteJob(new JobKey(name,groupName));
            }
        }catch (Exception e){
            log.info("删除任务失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


2. 修改job


  1. controller层
@PutMapping("/update")
    public APIResult update(@RequestBody JobPayload jobPayload){
        return jobService.update(jobPayload);
    }


  1. service层
APIResult update(JobPayload jobPayload);


  1. serviceImpl层
@Override
    public APIResult update(JobPayload jobPayload) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            TriggerKey triggerKey = new TriggerKey(jobPayload.getName(), jobPayload.getGroup());
            switch (jobPayload.getScheduleType()){
                case CwConstant.JobType.ONCE:
                    //获取trigger,修改执行频率
                    Trigger trigger = scheduler.getTrigger(triggerKey);
                    Date startTime = trigger.getStartTime();
                    Date date = DateUtil.parseYYYYMMDD(jobPayload.getCron());
                    boolean equals = date.equals(startTime);
                    if(!equals){
                        Trigger build = newTrigger().withIdentity(triggerKey).startAt(date)
                                .withDescription(jobPayload.getDesc())
                                .usingJobData(CwConstant.PAYLOAD,jobPayload.getDataMap())
                                .build();
                        scheduler.rescheduleJob(triggerKey,build);
                    }
                case CwConstant.JobType.REPEAT:
                    CronTrigger cronTrigger = (CronTrigger)scheduler.getTrigger(triggerKey);
                    String cronExpression = cronTrigger.getCronExpression();
                    if(!ObjectUtils.nullSafeEquals(jobPayload.getCron(),cronExpression)){
                        CronTrigger build = newTrigger().withIdentity(triggerKey)
                                .withDescription(jobPayload.getDesc())
                                .usingJobData(CwConstant.PAYLOAD,jobPayload.getDataMap())
                                .withSchedule(cronSchedule(jobPayload.getCron()))
                                .build();
                        scheduler.rescheduleJob(triggerKey,build);
                    }
            }
        }catch (Exception e){
            log.info("修改失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


2. 查询job


  1. controller层
@GetMapping("/list")
    public APIResult getJobList(@RequestParam(required = false) String name,@RequestParam String groupName){
        return jobService.getJobList(name,groupName);
    }


  1. service层
APIResult getJobList(String name, String groupName);


  1. serviceImpl层
@Override
    public APIResult getJobList(String name, String groupName) {
        List<JobPayload> jobs = new ArrayList<>();
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            Set<JobKey> jobKeys;
            if(StringUtils.isEmpty(groupName)){
                if(!StringUtils.isEmpty(name)){
                    CronTrigger trigger = (CronTrigger)scheduler.getTrigger(new TriggerKey(name, groupName));
                    jobs.add(JobPayload.getInstance(trigger));
                    return APIResult.success(jobs);
                }
                jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup());
            }else {
                jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName));
            }
            jobKeys.forEach(jobKey -> {
                try {
                    CronTrigger trigger = (CronTrigger)scheduler.getTrigger(new TriggerKey(jobKey.getName(), jobKey.getGroup()));
                    JobPayload instance = JobPayload.getInstance(trigger);
                    jobs.add(instance);
                } catch (SchedulerException e) {
                    log.info("任务不存在:{}",e);
                }
            });
        }catch (Exception e){
            log.info("任务列表获取失败{}",e);
            return APIResult.success(jobs);
        }
        return APIResult.success(jobs);
    }


2. 停止和启用job


  1. controller层
@GetMapping("/pauseJob")
    public APIResult pauseJob(@RequestParam String name,@RequestParam String groupName){
        return jobService.pauseJob(name,groupName);
    }
    @GetMapping("/resumeJob")
    public APIResult resumeJob(@RequestParam String name,@RequestParam String groupName){
        return jobService.resumeJob(name,groupName);
    }


  1. service层
APIResult pauseJob(String name, String groupName);
    APIResult resumeJob(String name, String groupName);


  1. serviceImpl层
@Override
    public APIResult pauseJob(String name, String groupName) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.pauseTrigger(new TriggerKey(name, groupName));
        }catch (Exception e){
            log.info("暂停任务失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }
    @Override
    public APIResult resumeJob(String name, String groupName) {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.resumeTrigger(new TriggerKey(name, groupName));
        }catch (Exception e){
            log.info("恢复任务失败:{}",e);
            return APIResult.error(ReturnCodeEnum.JOB_INITIAL_FAILURE);
        }
        return APIResult.success(null);
    }


三、测试结果



2020-12-27 20:53:01.877  INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory      : Using default implementation for ThreadExecutor
2020-12-27 20:53:01.884  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.SchedulerSignalerImpl    : Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2020-12-27 20:53:01.885  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler          : Quartz Scheduler v.2.2.1 created.
2020-12-27 20:53:01.885  INFO 5076 --- [nio-8088-exec-2] org.quartz.simpl.RAMJobStore             : RAMJobStore initialized.
2020-12-27 20:53:01.886  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler          : Scheduler meta-data: Quartz Scheduler (v2.2.1) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 3 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
2020-12-27 20:53:01.886  INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'QuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
2020-12-27 20:53:01.886  INFO 5076 --- [nio-8088-exec-2] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.2.1
2020-12-27 20:53:01.892  INFO 5076 --- [nio-8088-exec-2] org.quartz.core.QuartzScheduler          : Scheduler QuartzScheduler_$_NON_CLUSTERED started.
2020-12-27 20:53:01.892  INFO 5076 --- [nio-8088-exec-2] c.e.d.service.impl.JobServiceImpl        : 成功创建重复任务!!!
{name:'ljl'}
hello world!!!
{name:'ljl'}
hello world!!!
{name:'ljl'}
hello world!!!
{name:'lsr'}
hello world!!!
{name:'lsr'}
hello world!!!


目录
相关文章
|
3月前
|
SQL 数据库 数据安全/隐私保护
SQL Server数据库Owner导致事务复制log reader job无法启动的解决办法
【8月更文挑战第14天】解决SQL Server事务复制Log Reader作业因数据库所有者问题无法启动的方法:首先验证数据库所有者是否有效并具足够权限;若非,使用`ALTER AUTHORIZATION`更改为有效登录名。其次,确认Log Reader使用的登录名拥有读取事务日志所需的角色权限。还需检查复制配置是否准确无误,并验证Log Reader代理的连接信息及参数。重启SQL Server Agent服务或手动启动Log Reader作业亦可能解决问题。最后,审查SQL Server错误日志及Windows事件查看器以获取更多线索。
|
20天前
|
前端开发 Java 数据库连接
javamvc配置,增删改查,文件上传下载。
【10月更文挑战第4天】javamvc配置,增删改查,文件上传下载。
33 1
|
24天前
|
存储 NoSQL API
使用Py2neo进行Neo4j图数据库的增删改查操作
使用Py2neo进行Neo4j图数据库的增删改查操作
42 5
|
25天前
|
数据可视化 API PHP
低代码开发工具-学生管理系统-老师管理增删改查实现
低代码开发工具-学生管理系统-老师管理增删改查实现
28 5
|
13天前
|
JavaScript 前端开发 测试技术
[新手入门]todolist增删改查:vue3+ts版本!
【10月更文挑战第15天】[新手入门]todolist增删改查:vue3+ts版本!
|
2月前
|
SQL 关系型数据库 MySQL
学成在线笔记+踩坑(3)——【内容模块】课程分类查询、课程增改删、课程计划增删改查,统一异常处理+JSR303校验
课程分类查询、课程新增、统一异常处理、统一封装结果类、JSR303校验、修改课程、查询课程计划、新增/修改课程计划
学成在线笔记+踩坑(3)——【内容模块】课程分类查询、课程增改删、课程计划增删改查,统一异常处理+JSR303校验
|
2月前
|
SQL 关系型数据库 MySQL
ThinkPHP6 连接使用数据库,增删改查,find,select,save,insert,insertAll,insertGetId,delete,update方法的用法
本文介绍了在ThinkPHP6框架中如何连接和使用数据库进行增删改查操作。内容包括配置数据库连接信息、使用Db类进行原生MySQL查询、find方法查询单个数据、select方法查询数据集、save方法添加数据、insertAll方法批量添加数据、insertGetId方法添加数据并返回自增主键、delete方法删除数据和update方法更新数据。此外,还说明了如何通过数据库配置文件进行数据库连接信息的配置,并强调了在使用Db类时需要先将其引入。
ThinkPHP6 连接使用数据库,增删改查,find,select,save,insert,insertAll,insertGetId,delete,update方法的用法
|
25天前
|
Java API 数据库
Data jpa 增删改查的方法分别有哪些
Data jpa 增删改查的方法分别有哪些
|
3月前
|
SQL 数据库连接 API
ThinkPHP6实现增删改查接口
ThinkPHP6实现增删改查接口
38 1
|
3月前
|
XML 数据库 数据格式
Spring5入门到实战------14、完全注解开发形式 ----JdbcTemplate操作数据库(增删改查、批量增删改)。具体代码+讲解 【终结篇】
这篇文章是Spring5框架的实战教程的终结篇,介绍了如何使用注解而非XML配置文件来实现JdbcTemplate的数据库操作,包括增删改查和批量操作,通过创建配置类来注入数据库连接池和JdbcTemplate对象,并展示了完全注解开发形式的项目结构和代码实现。
Spring5入门到实战------14、完全注解开发形式 ----JdbcTemplate操作数据库(增删改查、批量增删改)。具体代码+讲解 【终结篇】

热门文章

最新文章