后台定时任务系统在应用平台中的重要性不言而喻,特别是互联网电商、金融等行业更是离不开定时任务。在任务数量不多、执行频率不高时,单台服务器完全能够满足。但是随着业务逐渐增加,定时任务系统必须具备高可用和水平扩展的能力,单台服务器已经不能满足需求。因此需要把定时任务系统部署到集群中,实现分布式定时任务系统集群。
分布式任务调度框架几乎是每个大型应用必备的工具,下面我们结合项目实践,对业界普遍使用的开源分布式任务调度框架的使用进行了探究实践,并分析了这几种框架的优劣势和对自身业务的思考。
一、分布式定时任务简介
1.什么是分布式任务?
分布式定时任务就是把分散的、批量的后台定时任务纳入统一的管理调度平台,实现任务的集群管理、调度和分布式部署管理方式。
2.分布式定时任务的特点
实际项目中涉及到分布式任务的业务场景非常多,这就使得我们的定时任务系统应该集管理、调度、任务分配、监控预警为一体的综合调度系统,如何打造一套健壮的、适应不同场景的系统,技术选型尤其重要。针对以上场景我们需要我们的分布式任务系统具备以下能力:
- 支持多种任务类型(shell任务/Java任务/web任务)
- 支持HA,负载均衡和失败转移
- 支持弹性扩容(应对开门红以及促销活动)
- 支持Job Timeout 处理
- 支持统一监控和告警
- 支持任务统一配置
- 支持资源隔离和作业隔离
二、为什么需要分布式定时任务?
定时任务系统在应用平台中的重要性不言而喻,特别是互联网电商、金融等行业更是离不开定时任务。在任务数量不多、执行频率不高时,单台服务器完全能够满足。但是,为什么还需要分布式呢?主要有如下两点原因:
- 高可用:单机版的定式任务调度只能在一台机器上运行,如果系统出现异常,就会导致整个后台定时任务不可用。这对于互联网企业来说是不可接受的。
- 单机处理极限:单机处理的数据,任务数量是有限的。原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来一个统计需要1小时,现在业务方需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并行处理可以提高单位时间的处理效率,但是单机能力毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。
但我们遇到的问题还不止这些,比如容错功能、失败重试、分片功能、路由负载均衡、管理后台等。这些都是单机的定时任务系统所不具备的,因此需要把定时任务系统部署到集群中,实现分布式定时任务系统集群。
三、常见开源方案
目前,分布式定时任务框架非常多,而且大部分都已经开源,比较流行的有:xxl-job、elastic-job、quartz等。
- elastic-job,是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。
- xxl-job,是由个人开源的一个轻量级分布式任务调度框架 ,主要分为 调度中心和执行器两部分 , 调度中心在启动初始化的时候,会默认生成执行器的RPC代理对象(http协议调用), 执行器项目启动之后, 调度中心在触发定时器之后通过jobHandle 来调用执行器项目里面的代码,核心功能和elastic-job差不多,同时技术文档比较完善
- quartz 的常见集群方案如下,通过在数据库中配置定时器信息, 以数据库悲观锁的方式达到同一个任务始终只有一个节点在运行,
以表列出了几个代表性的开源分布式任务框架的:
功能 | quartz | elastic-job | xxl-job |
HA | 多节点部署,通过数据库锁来保证只有一个节点执行任务 | 通过zookeeper的注册与发现,可以动态的添加服务器。支持水平扩容 | 集群部署 |
任务分片 | — | 支持 | 支持 |
文档完善 | 完善 | 完善 | 完善 |
管理界面 | 无 | 支持 | 支持 |
难易程度 | 简单 | 较复杂 | 简单 |
公司 | OpenSymphony | 当当网 | 个人 |
缺点 | 没有管理界面,以及不支持任务分片等。不适用于分布式场景 | 需要引入zookeeper , mesos, 增加系统复杂度, 学习成本较高 | 通过获取数据库锁的方式,保证集群中执行任务的唯一性,性能不好。 |
四、基于Quartz实现分布式定时任务解决方案
1.Quartz的集群解决方案
Quartz的单机版本相比大家应该比较熟悉,它的集群方案则是在单机的基础上加上一个公共数据库。通过在数据库中配置定时器信息, 以数据库悲观锁的方式达到同一个任务始终只有一个节点在运行,集群架构如下:
通过上面的架构图可以看到,三个Quartz服务节点共享同一个数据库,如果某一个服务节点失效,那么Job会在其他节点上执行。各个Quartz服务器都遵守基于数据库锁的调度原则,只有获取了锁才能调度后台任务,从而保证了任务执行的唯一性。同时多个节点的异步运行保证了服务的可靠性。
2.实现基于Quartz的分布式定时任务
下面就通过示例,演示如何基于Quartz实现分布式定时任务。
1. 添加Quartz依赖
由于分布式的原因,Quartz中提供分布式处理的JAR包以及数据库和连接相关的依赖。示例代码如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <!-- mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- orm --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency>
在上面的示例中,除了添加Quartz依赖外,还需要添加mysql-connector-java和spring-boot-starter-data-jpa两个组件,这两个组件主要用于JOB持久化到MySQL数据库。
2. 初始化Quartz数据库
分布式Quartz定时任务的配置信息存储在数据库中,数据库初始化脚本可以在官方网站中查找,默认保存在quartz-2.2.3-distribution\src\org\quartz\impl\jdbcjobstore\tables-mysql.sql目录下。首先创建quartz_jobs数据库,然后在数据库中执行tables-mysql.sql初始化脚本。
3. 配置数据库和Quartz
修改application.properties配置文件,配置数据库与Quartz。具体操作如下:
# server.port=8090 # Quartz 数据库 spring.datasource.url=jdbc:mysql://localhost:3306/quartz_jobs?useSSL=false&serverTimezone=UTC spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.max-active=1000 spring.datasource.max-idle=20 spring.datasource.min-idle=5 spring.datasource.initial-size=10 # 是否使用properties作为数据存储 org.quartz.jobStore.useProperties=false # 数据库中表的命名前缀 org.quartz.jobStore.tablePrefix=QRTZ_ # 是否是一个集群,是不是分布式的任务 org.quartz.jobStore.isClustered=true # 集群检查周期,单位为毫秒,可以自定义缩短时间。当某一个节点宕机的时候,其他节点等待多久后开始执行任务 org.quartz.jobStore.clusterCheckinInterval=5000 # 单位为毫秒,集群中的节点退出后,再次检查进入的时间间隔 org.quartz.jobStore.misfireThreshold=60000 # 事务隔离级别 org.quartz.jobStore.txIsolationLevelReadCommitted=true # 存储的事务管理类型 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX # 使用的Delegate类型 org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate # 集群的命名,一个集群要有相同的命名 org.quartz.scheduler.instanceName=ClusterQuartz # 节点的命名,可以自定义。AUTO代表自动生成 org.quartz.scheduler.instanceId=AUTO # rmi远程协议是否发布 org.quartz.scheduler.rmi.export=false # rmi远程协议代理是否创建 org.quartz.scheduler.rmi.proxy=false # 是否使用用户控制的事务环境触发执行任务 org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
上面的配置主要是Quartz数据库和Quartz分布式集群相关的属性配置。分布式定时任务的配置存储在数据库中,所以需要配置数据库连接和Quartz配置信息,为Quartz提供数据库配置信息,如数据库、数据表的前缀之类。
4. 定义定时任务
后台定时任务与普通Quartz任务并无差异,只是增加了@PersistJobDataAfterExecution注解和@DisallowConcurrentExecution注解。创建QuartzJob定时任务类并实现Quartz定时任务的具体示例代码如下:
// 持久化 @PersistJobDataAfterExecution // 禁止并发执行 @DisallowConcurrentExecution public class QuartzJob extends QuartzJobBean { private static final Logger log = LoggerFactory.getLogger(QuartzJob.class); @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { String taskName = context.getJobDetail().getJobDataMap().getString("name"); log.info("---> Quartz job, time:{"+new Date()+"} ,name:{"+taskName+"}<----"); } }
在上面的示例中,创建了QuartzJob定时任务类,使用@PersistJobDataAfterExecution注解持久化任务信息。DisallowConcurrentExecution禁止并发执行,避免同一个任务被多次并发执行。
5. SchedulerConfig配置
创建SchedulerConfig配置类,初始化Quartz分布式集群相关配置,包括集群设置、数据库等。示例代码如下:
@Configuration public class SchedulerConfig { @Autowired private DataSource dataSource; /** * 调度器 * * @return * @throws Exception */ @Bean public Scheduler scheduler() throws Exception { Scheduler scheduler = schedulerFactoryBean().getScheduler(); return scheduler; } /** * Scheduler工厂类 * * @return * @throws IOException */ @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName("Cluster_Scheduler"); factory.setDataSource(dataSource); factory.setApplicationContextSchedulerContextKey("applicationContext"); factory.setTaskExecutor(schedulerThreadPool()); //factory.setQuartzProperties(quartzProperties()); factory.setStartupDelay(10);// 延迟10s执行 return factory; } /** * 配置Schedule线程池 * * @return */ @Bean public Executor schedulerThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); executor.setQueueCapacity(Runtime.getRuntime().availableProcessors()); return executor; } }
在上面的示例中,主要是配置Schedule线程池、配置Quartz数据库、创建Schedule调度器实例等初始化配置。
6. 触发定时任务
配置完成之后,还需要触发定时任务,创建JobStartupRunner类以便在系统启动时触发所有定时任务。示例代码如下:
@Component public class JobStartupRunner implements CommandLineRunner { @Autowired SchedulerConfig schedulerConfig; private static String TRIGGER_GROUP_NAME = "test_trigger"; private static String JOB_GROUP_NAME = "test_job"; @Override public void run(String... args) throws Exception { Scheduler scheduler; try { scheduler = schedulerConfig.scheduler(); TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (null == trigger) { Class clazz = QuartzJob.class; JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob").build(); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?"); trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME) .withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); System.out.println("Quartz 创建了job:...:" + jobDetail.getKey()); } else { System.out.println("job已存在:{}" + trigger.getKey()); } TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME); CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2); if (null == trigger2) { Class clazz = QuartzJob2.class; JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob2").build(); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?"); trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME) .withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail2, trigger2); System.out.println("Quartz 创建了job:...:{}" + jobDetail2.getKey()); } else { System.out.println("job已存在:{}" + trigger2.getKey()); } scheduler.start(); } catch (Exception e) { System.out.println(e.getMessage()); } } }
在上面的示例中,为了适应分布式集群,我们在系统启动时触发定时任务,判断任务是否已经创建、是否正在执行。如果集群中的其他示例已经创建了任务,则启动时无须触发任务。
7. 验证测试
配置完成之后,接下来启动任务,测试分布式任务配置是否成功。启动一个实例,可以看到定时任务执行了,然后每10秒钟打印输出一次,如下图所示。
接下来,模拟分布式部署的情况。我们再启动一个测试程序实例,这样就有两个后台定时任务实例,如下所示。
后台定时任务实例1的日志输出:
后台定时任务实例2的日志输出:
从上面的日志中可以看到,Quartz Job和Quartz Job2交替地在两个任务实例进程中执行,同一时刻同一个任务只有一个进程在执行,这说明已经达到了分布式后台定时任务的效果。
接下来,停止任务实例1,测试任务实例2是否会接管所有任务继续执行。如下图所示,停止任务实例1后,任务实例2接管了所有的定时任务。这样如果集群中的某个实例异常了,其他实例能够接管所有的定时任务,确保任务集群的稳定运行。
最后
以上,就把分布式后台任务介绍完了,并通过Spring Boot + Quartz 实现了基于Quartz的分布式定时任务解决方案!
分布式任务调度框架几乎是每个大型应用必备的工具,作为程序员、架构师必须熟练掌握。