SpringBoot 整合 Quartz 实现分布式调度(五)

简介: 阅读完本文大概需要 10 分钟,本文主要分享内容如下: springboot + quartz + mysql 实现持久化分布式调度 集群环境任务调度测试

4.11、注册监听器(选用)

当然,如果你想在 SpringBoot 里面集成 Quartz 的监听器,操作也很简单!

  • 创建任务调度监听器
@Component
public class SimpleSchedulerListener extends SchedulerListenerSupport {
    @Override
    public void jobScheduled(Trigger trigger) {
        System.out.println("任务被部署时被执行");
    }
    @Override
    public void jobUnscheduled(TriggerKey triggerKey) {
        System.out.println("任务被卸载时被执行");
    }
    @Override
    public void triggerFinalized(Trigger trigger) {
        System.out.println("任务完成了它的使命,光荣退休时被执行");
    }
    @Override
    public void triggerPaused(TriggerKey triggerKey) {
        System.out.println(triggerKey + "(一个触发器)被暂停时被执行");
    }
    @Override
    public void triggersPaused(String triggerGroup) {
        System.out.println(triggerGroup + "所在组的全部触发器被停止时被执行");
    }
    @Override
    public void triggerResumed(TriggerKey triggerKey) {
        System.out.println(triggerKey + "(一个触发器)被恢复时被执行");
    }
    @Override
    public void triggersResumed(String triggerGroup) {
        System.out.println(triggerGroup + "所在组的全部触发器被回复时被执行");
    }
    @Override
    public void jobAdded(JobDetail jobDetail) {
        System.out.println("一个JobDetail被动态添加进来");
    }
    @Override
    public void jobDeleted(JobKey jobKey) {
        System.out.println(jobKey + "被删除时被执行");
    }
    @Override
    public void jobPaused(JobKey jobKey) {
        System.out.println(jobKey + "被暂停时被执行");
    }
    @Override
    public void jobsPaused(String jobGroup) {
        System.out.println(jobGroup + "(一组任务)被暂停时被执行");
    }
    @Override
    public void jobResumed(JobKey jobKey) {
        System.out.println(jobKey + "被恢复时被执行");
    }
    @Override
    public void jobsResumed(String jobGroup) {
        System.out.println(jobGroup + "(一组任务)被恢复时被执行");
    }
    @Override
    public void schedulerError(String msg, SchedulerException cause) {
        System.out.println("出现异常" + msg + "时被执行");
        cause.printStackTrace();
    }
    @Override
    public void schedulerInStandbyMode() {
        System.out.println("scheduler被设为standBy等候模式时被执行");
    }
    @Override
    public void schedulerStarted() {
        System.out.println("scheduler启动时被执行");
    }
    @Override
    public void schedulerStarting() {
        System.out.println("scheduler正在启动时被执行");
    }
    @Override
    public void schedulerShutdown() {
        System.out.println("scheduler关闭时被执行");
    }
    @Override
    public void schedulerShuttingdown() {
        System.out.println("scheduler正在关闭时被执行");
    }
    @Override
    public void schedulingDataCleared() {
        System.out.println("scheduler中所有数据包括jobs, triggers和calendars都被清空时被执行");
    }
}
  • 创建任务触发监听器
@Component
public class SimpleTriggerListener extends TriggerListenerSupport {
    /**
     * Trigger监听器的名称
     * @return
     */
    @Override
    public String getName() {
        return "mySimpleTriggerListener";
    }
    /**
     * Trigger被激发 它关联的job即将被运行
     * @param trigger
     * @param context
     */
    @Override
    public void triggerFired(Trigger trigger, JobExecutionContext context) {
        System.out.println("myTriggerListener.triggerFired()");
    }
    /**
     * Trigger被激发 它关联的job即将被运行, TriggerListener 给了一个选择去否决 Job 的执行,如果返回TRUE 那么任务job会被终止
     * @param trigger
     * @param context
     * @return
     */
    @Override
    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
        System.out.println("myTriggerListener.vetoJobExecution()");
        return false;
    }
    /**
     * 当Trigger错过被激发时执行,比如当前时间有很多触发器都需要执行,但是线程池中的有效线程都在工作,
     * 那么有的触发器就有可能超时,错过这一轮的触发。
     * @param trigger
     */
    @Override
    public void triggerMisfired(Trigger trigger) {
        System.out.println("myTriggerListener.triggerMisfired()");
    }
    /**
     * 任务完成时触发
     * @param trigger
     * @param context
     * @param triggerInstructionCode
     */
    @Override
    public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigger.CompletedExecutionInstruction triggerInstructionCode) {
        System.out.println("myTriggerListener.triggerComplete()");
    }
}
  • 创建任务执行监听器
@Component
public class SimpleJobListener extends JobListenerSupport {
    /**
     * job监听器名称
     * @return
     */
    @Override
    public String getName() {
        return "mySimpleJobListener";
    }
    /**
     * 任务被调度前
     * @param context
     */
    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        System.out.println("simpleJobListener监听器,准备执行:"+context.getJobDetail().getKey());
    }
    /**
     * 任务调度被拒了
     * @param context
     */
    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        System.out.println("simpleJobListener监听器,取消执行:"+context.getJobDetail().getKey());
    }
    /**
     * 任务被调度后
     * @param context
     * @param jobException
     */
    @Override
    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
        System.out.println("simpleJobListener监听器,执行结束:"+context.getJobDetail().getKey());
    }
}
  • 最后,将监听器注册到Scheduler
@Autowired
private SimpleSchedulerListener simpleSchedulerListener;
@Autowired
private SimpleJobListener simpleJobListener;
@Autowired
private SimpleTriggerListener simpleTriggerListener;
@Bean(name = "scheduler")
public Scheduler scheduler() throws IOException, SchedulerException {
    Scheduler scheduler = schedulerFactoryBean().getScheduler();
    //全局添加监听器
    //添加SchedulerListener监听器
    scheduler.getListenerManager().addSchedulerListener(simpleSchedulerListener);
    // 添加JobListener, 支持带条件匹配监听器
    scheduler.getListenerManager().addJobListener(simpleJobListener, KeyMatcher.keyEquals(JobKey.jobKey("myJob", "myGroup")));
    // 添加triggerListener,设置全局监听
    scheduler.getListenerManager().addTriggerListener(simpleTriggerListener, EverythingMatcher.allTriggers());
    return scheduler;
}

4.12、采用项目数据源(选用)

在上面的 Quartz 数据源配置中,我们使用了自定义的数据源,目的是和项目中的数据源实现解耦,当然有的同学不想单独建库,想和项目中数据源保持一致,配置也很简单!

  • quartz.properties配置文件中,去掉org.quartz.jobStore.dataSource配置
#注释掉quartz的数据源配置
#org.quartz.jobStore.dataSource=qzDS
  • QuartzConfig配置类中加入dataSource数据源,并将其注入到quartz
@Autowired
private DataSource dataSource;
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
    //...
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setQuartzProperties(propertiesFactoryBean.getObject());
    //使用数据源,自定义数据源
    factory.setDataSource(dataSource);
    //...
    return factory;
}


五、任务调度测试

在实际的部署中,项目都是集群进行部署,因此为了和正式环境一致,我们再新建两个相同的项目来测试一下在集群环境下 quartz 是否可以实现分布式调度,保证任何一个定时任务只有一台机器在运行

理论上,我们只需要将刚刚新建好的项目,重新复制一份,然后修改一下端口号就可以实现本地测试!

因为curd服务只需要一个,因此我们不需要再编写QuartzJobService等增、删、改服务,仅仅保持QuartzConfigDruidConnectionProviderQuartzJobFactoryTfCommandJobquartz.properties类和配置都是相同的就可以了!


91.jpg

  • 依次启动服务quartz-001quartz-002quartz-003,看看效果如何

第一个启动的服务quartz-001会优先加载数据库中已经配置好的定时任务,其他两个服务quartz-002quartz-003都没有主动调度服务

92.jpg


quartz-001

  • 当我们主动关闭quartz-001quartz-002服务主动接收任务调度

94.jpg

quartz-002

  • 当我们主动关闭quartz-002,同样quartz-003服务主动接收任务调度

95.jpg



quartz-003

最终结果,和我们预期效果一致!

六、小结

本文主要围绕springboot + quartz + mysql实现持久化分布式调度进行介绍,所有的代码功能,笔者都亲自测试过,可能内容比较多,鉴于笔者才疏学浅,如果有遗漏的地方,欢迎网友批评指出!

相关文章
|
5天前
|
缓存 NoSQL Java
springboot分布式锁
springboot分布式锁
17 6
|
2天前
|
存储 Java 文件存储
Spring Boot中的分布式文件系统
Spring Boot中的分布式文件系统
|
3天前
|
NoSQL 前端开发 Java
技术笔记:springboot分布式锁组件spring
技术笔记:springboot分布式锁组件spring
|
2天前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot中的分布式缓存方案
|
2天前
|
监控 Java BI
Spring Boot中的定时任务调度
Spring Boot中的定时任务调度
|
2天前
|
消息中间件 Java 数据库
Spring Boot中如何实现分布式事务
Spring Boot中如何实现分布式事务
|
2天前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot中的分布式缓存方案
|
2天前
|
监控 Java 调度
Spring Boot中的定时任务调度
Spring Boot中的定时任务调度
|
3天前
|
存储 Java 数据库
Spring Boot与分布式事务的最佳实践
Spring Boot与分布式事务的最佳实践
|
3天前
|
缓存 Java 机器人
Spring Boot中如何集成Hazelcast实现分布式缓存
Spring Boot中如何集成Hazelcast实现分布式缓存