实战 | 使用Spring Boot + Quartz 实现分布式定时任务平台

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 本文将从项目实战出发来介绍分布式定时任务的实现。在某些应用场景下要求任务必须具备高可用性和可扩展性,单台服务器不能满足业务需求,这时就需要使用Quartz实现分布式定时任务。

本文将从项目实战出发来介绍分布式定时任务的实现。在某些应用场景下要求任务必须具备高可用性和可扩展性,单台服务器不能满足业务需求,这时就需要使用Quartz实现分布式定时任务。

一、分布式任务应用场景

定时任务系统在应用平台中的重要性不言而喻,特别是互联网电商、金融等行业更是离不开定时任务。在任务数量不多、执行频率不高时,单台服务器完全能够满足。


但是随着业务逐渐增加,定时任务系统必须具备高可用和水平扩展的能力,单台服务器已经不能满足需求。因此需要把定时任务系统部署到集群中,实现分布式定时任务系统集群。


Quartz的集群功能通过故障转移和负载平衡功能为调度程序带来高可用性和可扩展性。


Quartz是通过数据库表来存储和共享任务信息的。独立的Quartz节点并不与另一个节点或者管理节点通信,而是通过数据库锁机制来调度执行定时任务。


需要注意的是,在集群环境下,时钟必须同步,否则执行时间不一致。


二、Quartz实现分布式定时任务

1. 添加Quartz依赖

首先,引入Quartz中提供分布式处理的JAR包以及数据库和连接相关的依赖。示例代码如下:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- mysql -->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- orm -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

在上面的示例中,除了添加Quartz依赖外,还需要添加mysql-connector-javaspring-boot-starter-data-jpa两个组件,这两个组件主要用于JOB持久化到MySQL数据库。


2. 初始化Quartz数据库

分布式Quartz定时任务的配置信息存储在数据库中,数据库初始化脚本可以在官方网站中查找,默认保存在quartz-2.2.3-distribution\src\org\quartz\impl\jdbcjobstore\tables-mysql.sql目录下。首先创建quartz_jobs数据库,然后在数据库中执行tables-mysql.sql初始化脚本。具体示例如下:

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
CREATE TABLE QRTZ_JOB_DETAILS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    JOB_NAME  VARCHAR(200) NOT NULL,
    JOB_GROUP VARCHAR(200) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    JOB_CLASS_NAME   VARCHAR(250) NOT NULL,
    IS_DURABLE VARCHAR(1) NOT NULL,
    IS_NONCONCURRENT VARCHAR(1) NOT NULL,
    IS_UPDATE_DATA VARCHAR(1) NOT NULL,
    REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    JOB_NAME  VARCHAR(200) NOT NULL,
    JOB_GROUP VARCHAR(200) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    NEXT_FIRE_TIME BIGINT(13) NULL,
    PREV_FIRE_TIME BIGINT(13) NULL,
    PRIORITY INTEGER NULL,
    TRIGGER_STATE VARCHAR(16) NOT NULL,
    TRIGGER_TYPE VARCHAR(8) NOT NULL,
    START_TIME BIGINT(13) NOT NULL,
    END_TIME BIGINT(13) NULL,
    CALENDAR_NAME VARCHAR(200) NULL,
    MISFIRE_INSTR SMALLINT(2) NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
        REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE QRTZ_SIMPLE_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    REPEAT_COUNT BIGINT(7) NOT NULL,
    REPEAT_INTERVAL BIGINT(12) NOT NULL,
    TIMES_TRIGGERED BIGINT(10) NOT NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CRON_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    CRON_EXPRESSION VARCHAR(200) NOT NULL,
    TIME_ZONE_ID VARCHAR(80),
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    STR_PROP_1 VARCHAR(512) NULL,
    STR_PROP_2 VARCHAR(512) NULL,
    STR_PROP_3 VARCHAR(512) NULL,
    INT_PROP_1 INT NULL,
    INT_PROP_2 INT NULL,
    LONG_PROP_1 BIGINT NULL,
    LONG_PROP_2 BIGINT NULL,
    DEC_PROP_1 NUMERIC(13,4) NULL,
    DEC_PROP_2 NUMERIC(13,4) NULL,
    BOOL_PROP_1 VARCHAR(1) NULL,
    BOOL_PROP_2 VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_BLOB_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    BLOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_CALENDARS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    CALENDAR_NAME  VARCHAR(200) NOT NULL,
    CALENDAR BLOB NOT NULL,
    PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_GROUP  VARCHAR(200) NOT NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
CREATE TABLE QRTZ_FIRED_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    ENTRY_ID VARCHAR(95) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    INSTANCE_NAME VARCHAR(200) NOT NULL,
    FIRED_TIME BIGINT(13) NOT NULL,
    SCHED_TIME BIGINT(13) NOT NULL,
    PRIORITY INTEGER NOT NULL,
    STATE VARCHAR(16) NOT NULL,
    JOB_NAME VARCHAR(200) NULL,
    JOB_GROUP VARCHAR(200) NULL,
    IS_NONCONCURRENT VARCHAR(1) NULL,
    REQUESTS_RECOVERY VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
CREATE TABLE QRTZ_SCHEDULER_STATE
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    INSTANCE_NAME VARCHAR(200) NOT NULL,
    LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
    CHECKIN_INTERVAL BIGINT(13) NOT NULL,
    PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
CREATE TABLE QRTZ_LOCKS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    LOCK_NAME  VARCHAR(40) NOT NULL,
    PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);

使用tables-mysql.sql创建表的语句执行完成后,说明Quartz的数据库和表创建成功,我们查看数据库的ER图,如下图所示。

image.png


3. 配置数据库和Quartz

修改application.properties配置文件,配置数据库与Quartz。具体操作如下:

# server.port=8090
# Quartz 数据库
spring.datasource.url=jdbc:mysql://localhost:3306/quartz_jobs?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.max-active=1000
spring.datasource.max-idle=20
spring.datasource.min-idle=5
spring.datasource.initial-size=10
# 是否使用properties作为数据存储
org.quartz.jobStore.useProperties=false
# 数据库中表的命名前缀
org.quartz.jobStore.tablePrefix=QRTZ_
# 是否是一个集群,是不是分布式的任务
org.quartz.jobStore.isClustered=true
# 集群检查周期,单位为毫秒,可以自定义缩短时间。当某一个节点宕机的时候,其他节点等待多久后开始执行任务
org.quartz.jobStore.clusterCheckinInterval=5000
# 单位为毫秒,集群中的节点退出后,再次检查进入的时间间隔
org.quartz.jobStore.misfireThreshold=60000
# 事务隔离级别
org.quartz.jobStore.txIsolationLevelReadCommitted=true
# 存储的事务管理类型
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# 使用的Delegate类型
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 集群的命名,一个集群要有相同的命名
org.quartz.scheduler.instanceName=ClusterQuartz
# 节点的命名,可以自定义。AUTO代表自动生成
org.quartz.scheduler.instanceId=AUTO
# rmi远程协议是否发布
org.quartz.scheduler.rmi.export=false
# rmi远程协议代理是否创建
org.quartz.scheduler.rmi.proxy=false
# 是否使用用户控制的事务环境触发执行任务
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false

上面的配置主要是Quartz数据库和Quartz分布式集群相关的属性配置。分布式定时任务的配置存储在数据库中,所以需要配置数据库连接和Quartz配置信息,为Quartz提供数据库配置信息,如数据库、数据表的前缀之类。


4. 定义定时任务

后台定时任务与普通Quartz任务并无差异,只是增加了@PersistJobDataAfterExecution注解和@DisallowConcurrentExecution注解。创建QuartzJob定时任务类并实现Quartz定时任务的具体示例代码如下:

// 持久化
@PersistJobDataAfterExecution
// 禁止并发执行
@DisallowConcurrentExecution
public class QuartzJob extends QuartzJobBean {
    private static final Logger log = LoggerFactory.getLogger(QuartzJob.class);
   @Override
   protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
       String taskName = context.getJobDetail().getJobDataMap().getString("name");
        log.info("---> Quartz job, time:{"+new Date()+"} ,name:{"+taskName+"}<----");
    }
}

在上面的示例中,创建了QuartzJob定时任务类,使用@PersistJobDataAfterExecution注解持久化任务信息。DisallowConcurrentExecution禁止并发执行,避免同一个任务被多次并发执行。


5. SchedulerConfig配置

创建SchedulerConfig配置类,初始化Quartz分布式集群相关配置,包括集群设置、数据库等。示例代码如下:

@Configuration
public class SchedulerConfig {
   @Autowired
    private DataSource dataSource;
    /**
     * 调度器
     *
     * @return
     * @throws Exception
     */
    @Bean
    public Scheduler scheduler() throws Exception {
       Scheduler scheduler = schedulerFactoryBean().getScheduler();
       return scheduler;
    }
    /**
     * Scheduler工厂类
     *
     * @return
     * @throws IOException
     */
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
       SchedulerFactoryBean factory = new SchedulerFactoryBean();
       factory.setSchedulerName("Cluster_Scheduler");
       factory.setDataSource(dataSource);
       factory.setApplicationContextSchedulerContextKey("applicationContext");
       factory.setTaskExecutor(schedulerThreadPool());
       //factory.setQuartzProperties(quartzProperties());
       factory.setStartupDelay(10);// 延迟10s执行
       return factory;
    }
    /**
     * 配置Schedule线程池
     *
     * @return
     */
    @Bean
    public Executor schedulerThreadPool() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
       executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
       executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
       return executor;
    }
}

在上面的示例中,主要是配置Schedule线程池、配置Quartz数据库、创建Schedule调度器实例等初始化配置。


6. 触发定时任务

配置完成之后,还需要触发定时任务,创建JobStartupRunner类以便在系统启动时触发所有定时任务。示例代码如下:

@Component
public class JobStartupRunner implements CommandLineRunner {
    @Autowired
    SchedulerConfig schedulerConfig;
    private static String TRIGGER_GROUP_NAME = "test_trigger";
    private static String JOB_GROUP_NAME = "test_job";
    @Override
    public void run(String... args) throws Exception {
        Scheduler scheduler;
        try {
            scheduler = schedulerConfig.scheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if (null == trigger) {
                Class clazz = QuartzJob.class;
                JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob").build();
               CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
                trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)
                       .withSchedule(scheduleBuilder).build();
               scheduler.scheduleJob(jobDetail, trigger);
                System.out.println("Quartz 创建了job:...:" + jobDetail.getKey());
            } else {
               System.out.println("job已存在:{}" + trigger.getKey());
            }
            TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);
            CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);
            if (null == trigger2) {
                Class clazz = QuartzJob2.class;
                JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob2").build();
               CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
                trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)
                       .withSchedule(scheduleBuilder).build();
               scheduler.scheduleJob(jobDetail2, trigger2);
               System.out.println("Quartz 创建了job:...:{}" + jobDetail2.getKey());
            } else {
               System.out.println("job已存在:{}" + trigger2.getKey());
            }
            scheduler.start();
        } catch (Exception e) {
           System.out.println(e.getMessage());
        }
    }
}

在上面的示例中,为了适应分布式集群,我们在系统启动时触发定时任务,判断任务是否已经创建、是否正在执行。如果集群中的其他示例已经创建了任务,则启动时无须触发任务。


三、 验证测试

配置完成之后,接下来启动任务,测试分布式任务配置是否成功。启动一个实例,可以看到定时任务执行了,然后每10秒钟打印输出一次,如下图所示。

image.png

接下来,模拟分布式部署的情况。我们再启动一个测试程序实例,这样就有两个后台定时任务实例

实例1:

image.png

实例2:

image.png

从上面的日志中可以看到,Quartz JobQuartz Job2交替地在两个任务实例进程中执行,同一时刻同一个任务只有一个进程在执行,这说明已经达到了分布式后台定时任务的效果。


接下来,停止任务实例1,测试任务实例2是否会接管所有任务继续执行。如图10-11所示,停止任务实例1后,任务实例2接管了所有的定时任务。这样如果集群中的某个实例异常了,其他实例能够接管所有的定时任务,确保任务集群的稳定运行。

image.png

最后

以上,我们就把Spring Boot集成Quartz实现分布式定时任务的功能介绍完了。分布式定时任务在应用开发中非常重要的功能模块,希望大家能够熟练掌握。


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
27天前
|
Java 应用服务中间件 Maven
SpringBoot 项目瘦身指南
SpringBoot 项目瘦身指南
41 0
|
1月前
|
SpringCloudAlibaba Java 持续交付
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
148 0
|
1月前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
98 0
|
1天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
6天前
|
Java Spring 容器
SpringBoot 使用Quartz执行定时任务对象时无法注入Bean问题
SpringBoot 使用Quartz执行定时任务对象时无法注入Bean问题
10 1
|
7天前
|
安全 Java 应用服务中间件
江帅帅:Spring Boot 底层级探索系列 03 - 简单配置
江帅帅:Spring Boot 底层级探索系列 03 - 简单配置
24 0
江帅帅:Spring Boot 底层级探索系列 03 - 简单配置
|
9天前
|
XML Java C++
【Spring系列】Sping VS Sping Boot区别与联系
【4月更文挑战第2天】Spring系列第一课:Spring Boot 能力介绍及简单实践
【Spring系列】Sping VS Sping Boot区别与联系
|
17天前
|
SQL Java 调度
SpringBoot集成quartz定时任务trigger_state状态ERROR解决办法
SpringBoot集成quartz定时任务trigger_state状态ERROR解决办法
|
27天前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
31 1
|
28天前
|
敏捷开发 监控 前端开发
Spring+SpringMVC+Mybatis的分布式敏捷开发系统架构
Spring+SpringMVC+Mybatis的分布式敏捷开发系统架构
66 0

热门文章

最新文章