任务调度场景
概述
在平时的业务场景中,经常有一些场景需要使用定时任务。
时间驱动的场景
- 某个时间点发送优惠券,发送短信等等
- 生成报表
- 爬虫(定点爬取某排行榜信息)
批量处理数据
批量统计上个月的账单,统计上个月销售数据等等。
固定频率的场景
每隔 5 分钟需要执行一次。
为什么需要任务调度平台
在 Java 中,传统的定时任务实现方案,比如 Timer,Quartz 等,缺点
:不支持集群、不支持统计、没有管理平台、没有失败报警、没有监控等等,分布式的架构中,有一些场景需要分布式任务调度,同一个服务多个实例的任务存在互斥时,需要统一的调度,任务调度需要支持高可用、监控、故障告警。需要统一管理和追踪各个服务节点任务调度的结果,需要记录保存任务属性信息等。
什么是分布式任务调度
任务调度是指基于给定的时间点,给定的时间间隔或者给定执行次数自动的执行任务。任务调度是是操作系统的重要组成部分,而对于实时的操作系统,任务调度直接影响着操作系统的实时性能。任务调度涉及到多线程并发、运行时间规则定制及解析、线程池的维护等诸多方面的工作。
任务调度框架
非分布式
在单一服务器当中,创建定时任务,@Scheduled
。
分布式
把分散的,可靠性差的计划任务纳入统一的平台,并实现集群管理调度和分布式部署的一种定时任务的管理方式,叫做分布式定时任务
Quartz
- 先驱者
- 无图形化界面
- 接口不人性化
Elasticjob
- 基于 Quartz
- elastic-job 是由当当网基于 Quartz 二次开发之后的分布式调度解决方案
- 依赖很多中间件 zk
xxl-job
- 美团点评里面开发者开发出来的
SchedulerX
- 阿里云出的一个框架
- 商用产品
PowerJob
- 个人
对比:
@Scheduled
创建工程:
使用 @Scheduled 添加注解创建 JobTest.java:
/** * @author BNTang * @version 1.0 * @project distributed-TimedTask * @description * @since Created in 2021/10/5 005 22:00 **/ @Slf4j @Component @EnableScheduling public class JobTest { @Scheduled(fixedDelayString = "3000", initialDelay = 5000) public void work() throws InterruptedException { log.info("work任务开始!"); Thread.sleep(5000); } }
启动之后运行效果如下图所示:
- @EnableScheduling:是否要开启 Scheduled 也可以加在启动类上,整个项目生效,@Scheduled
注解中属性的取值的含义,fixedDelayString、fixedDelay 代表固定延时,要等上一次任务结束时,下一次才开始计时,initialDelay
代表延迟启动。
修改一下 JobTest.java 的内容这里还需要在介绍一个 @Scheduled 注解当中的另外一个属性代码如下:
/** * @author BNTang * @version 1.0 * @project distributed-TimedTask * @description * @since Created in 2021/10/5 005 22:00 **/ @Slf4j @Component @EnableScheduling public class JobTest { @Scheduled(fixedRateString = "3000", initialDelay = 5000) public void show() throws InterruptedException { log.info("show任务开始!"); Thread.sleep(5000); } }
启动之后运行效果如下图所示:
- fixedRateString 代表的是等上一个任务结束之后,立马执行下一个任务,不会在等 3 秒,每隔 3 秒执行一次,程序耗时是 5
秒,由于是单线程,要等上一次方法执行完后,立马执行下一次。
异步任务调度
开启异步调度
创建异步任务
/** * @author BNTang * @version 1.0 * @project distributed-TimedTask * @description * @since Created in 2021/10/5 005 22:20 **/ @Slf4j @Component public class MyTask { @Async public void doTask() throws InterruptedException { log.info("doTask任务开始!"); Thread.sleep(5000); log.info("doTask任务结束!"); } }
调用异步任务
/** * @author BNTang * @version 1.0 * @project distributed-TimedTask * @description * @since Created in 2021/10/5 005 22:00 **/ @Slf4j @Component @EnableScheduling public class JobTest { @Resource private MyTask myTask; @Scheduled(fixedRateString = "3000", initialDelay = 5000) public void show() throws InterruptedException { log.info("show任务开始!"); this.myTask.doTask(); log.info("show任务结束!"); } }
启动之后运行效果如下图所示:
开启多线程执行
修改 JobTest.java:
/** * @author BNTang * @version 1.0 * @project distributed-TimedTask * @description * @since Created in 2021/10/5 005 22:00 **/ @Slf4j @Component @EnableScheduling public class JobTest { @Scheduled(fixedRateString = "3000") public void work() throws InterruptedException { log.info("work任务开始!"); Thread.sleep(5000); log.info("work任务结束!"); } @Scheduled(fixedRateString = "3000") public void show() throws InterruptedException { log.info("show任务开始!"); Thread.sleep(5000); log.info("show任务结束!"); } }
修改启动类 DistributedTimedTaskApplication.java:
/** * 分布式定时任务应用程序 * * @author BNTang * @date 2021/10/05 */ @EnableAsync @SpringBootApplication public class DistributedTimedTaskApplication { public static void main(String[] args) { SpringApplication.run(DistributedTimedTaskApplication.class, args); } @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.setPoolSize(10); return threadPoolTaskScheduler; } }
再次启动工程运行效果图如下图所示:
Cron 表达式
什么是 cron 表达式,一个 cron 表达式,最少有 5 个空格,来分割时间元素,Seconds Minutes Hours DayofMonth Month DayofWeek
Year,Day-of-Month
和 Day-of-Week
不可同时为 *
。
组成
通配符
- ,
表示列出枚举值。例如:在 Minutes 域使用 5,20,则意味着在 5 和 20 分每分钟触发一次。
- *
字符代表所有可能的值。* 在子表达式(月)里表示每个月的含义,* 在子表达式(日)表示每一天,在表达式(周)里表示星期的每一天,Day-of-Month
和 Day-of-Week 不可同时为 *。
- ?
表示不指定值。使用的场景为不需要关心当前设置这个字段的值,仅被用于(日)和(周)两个子表达式,表示不指定值当 2
个子表达式其中之一被指定了值以后,为了避免冲突,需要将另一个子表达式的值设为 “?”。
- /
“/” 字符用来指定数值的增量,在子表达式(分钟)里的 “0/15” 表示从第 0 分钟开始,每 15 分钟,在子表达式(分钟)里的 “3/20” 表示从第
3 分钟开始,每 20 分钟,例如在周字段上设置 “MON,WED,FRI” 表示周一,周三和周五触发。
- -
表示范围,例如在 Minutes 域使用 5-20,表示从 5 分到 20 分钟每分钟触发一次。
- L
表示最后的意思,仅被用于(日)和(周)两个子表达式,它是单词 “last” 的缩写。在天(月)子表达式中,“L” 表示一个月的最后一天,在天(星期)自表达式中,“L”
表示一个星期的最后一天,如果在 “L” 前有具体的内容,表示这个月的倒数第几天,6L,最后一个星期五。
- W
表示离指定日期的最近那个工作日(周一至周五),例如在日字段上设置 “15W”,表示离每月 15 号最近的那个工作日触发。如果 15
号正好是周六,则找最近的周五,如果指定格式为 “1W”,它则表示每月 1 号往后最近的工作日触发。
- LW
这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。
- #
用于确定每个月第几个星期几,只能出现在 DayofMonth 域。例如 ”2#3” 表示在每月的第三个周二。
QuartZ
官网:http://www.quartz-scheduler.org
引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency>
创建执行任务:
/** * @author BNTang * @version 1.0 * @project distributed-TimedTask * @description * @since Created in 2021/10/6 006 13:39 **/ @Slf4j public class MyQuartzJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) { log.info("MyQuartzJob任务执行!"); } }
添加配置:
/** * @author BNTang * @version 1.0 * @project distributed-TimedTask * @description * @since Created in 2021/10/6 006 13:41 **/ @Configuration public class MyQuartJobConfig { /** * 可以持久化到数据库当中 * 多个实例共享一个数据时,就可以做到分布式 * 实例化自己定义的Job * * @return {@link JobDetail} */ @Bean public JobDetail jobDetail() { return JobBuilder.newJob(MyQuartzJob.class) .withIdentity("job1", "group1") // 设置持久化 .storeDurably() .build(); } /** * 创建触发器,触发实例 * * @return {@link Trigger} */ @Bean public Trigger trigger() { return TriggerBuilder.newTrigger() .forJob(jobDetail()) .withIdentity("trigger1", "group1") // 立马启动 .startNow() .withSchedule(CronScheduleBuilder.cronSchedule("0,2 * * * * ?")) .build(); } }
再次启动工程运行效果图如下图所示:
xxl-job
官网
概述
xxl-job 是出自大众点评许雪里(xxl 就是作者名字的拼音首字母),的开源项目,官网上介绍这是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。跟elasticjob 不同,xxl-job 环境依赖于 mysql,不用 ZooKeeper,这也是最大的不同。xxl-job 中心式的调度平台轻量级,开箱即用,操作简易,上手快,与SpringBoot 有非常好的集成,而且监控界面就集成在调度中心,界面又简洁。对于企业维护起来成本不高,还有失败的邮件告警等等。这就使很多企业选择xxl-job 做调度平台。
系统组成
调度模块(调度中心)
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块。支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
执行模块(执行器)
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效。接收 “调度中心” 的执行请求、终止请求和日志请求等。
功能架构图
2 个角色
调度任务管理系统,xxl-job-admin,创建任务、设置时间、集群,使用的策略、创建任务时会指定任务名称,地址。xxl-job-excutor,通常是我们业务系统,配置上调度系统的地址,创建任务,在任务上指定任务名称。
搭建调度中心
下载源码:https://github.com/xuxueli/xxl-job/releases/tag/2.3.0
执行 SQL,在下载源码当中的 doc\db\tables_xxl_job.sql
当然我也直接贴在了下方:
# # XXL-JOB v2.3.0 # Copyright (c) 2015-present, xuxueli. CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci; use `xxl_job`; SET NAMES utf8mb4; CREATE TABLE `xxl_job_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '执行器主键ID', `job_desc` varchar(255) NOT NULL, `add_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `author` varchar(64) DEFAULT NULL COMMENT '作者', `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件', `schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型', `schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型', `misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略', `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略', `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler', `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数', `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略', `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数', `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型', `glue_source` mediumtext COMMENT 'GLUE源代码', `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注', `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间', `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔', `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间', PRIMARY KEY (`id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE `xxl_job_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '执行器主键ID', `job_id` int(11) NOT NULL COMMENT '任务,主键ID', `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址', `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler', `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数', `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2', `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数', `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间', `trigger_code` int(11) NOT NULL COMMENT '调度-结果', `trigger_msg` text COMMENT '调度-日志', `handle_time` datetime DEFAULT NULL COMMENT '执行-时间', `handle_code` int(11) NOT NULL COMMENT '执行-状态', `handle_msg` text COMMENT '执行-日志', `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败', PRIMARY KEY (`id`), KEY `I_trigger_time` (`trigger_time`), KEY `I_handle_code` (`handle_code`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE `xxl_job_log_report` ( `id` int(11) NOT NULL AUTO_INCREMENT, `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间', `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量', `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量', `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量', `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE `xxl_job_logglue` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_id` int(11) NOT NULL COMMENT '任务,主键ID', `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型', `glue_source` mediumtext COMMENT 'GLUE源代码', `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注', `add_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE `xxl_job_registry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `registry_group` varchar(50) NOT NULL, `registry_key` varchar(255) NOT NULL, `registry_value` varchar(255) NOT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), KEY `i_g_k_v` (`registry_group`, `registry_key`, `registry_value`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE `xxl_job_group` ( `id` int(11) NOT NULL AUTO_INCREMENT, `app_name` varchar(64) NOT NULL COMMENT '执行器AppName', `title` varchar(12) NOT NULL COMMENT '执行器名称', `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入', `address_list` text COMMENT '执行器地址列表,多地址逗号分隔', `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE `xxl_job_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50) NOT NULL COMMENT '账号', `password` varchar(50) NOT NULL COMMENT '密码', `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员', `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割', PRIMARY KEY (`id`), UNIQUE KEY `i_username` (`username`) USING BTREE ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE `xxl_job_lock` ( `lock_name` varchar(50) NOT NULL COMMENT '锁名称', PRIMARY KEY (`lock_name`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31'); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` (`lock_name`) VALUES ('schedule_lock'); commit;
修改 xxl-job-admin 项目当中的 appication.properties
配置把当中的数据库信息改为自己的数据库信息即可如下:
切换到 xxl-job-admin 目录当中打包,打包命令如下:
mvn clean -U package -Dmaven.test.skip=true
切换到 target 目录当中启动:
java -jar xxl-job-admin-2.3.0.jar --server.port=8088
启动之后再浏览器当中访问:http://localhost:8088/xxl-job-admin/toLogin 默认的用户名密码为:admin/123456。
调度任务执行流程
启动 xxl-job-admin 调度模块,直接运行,xxl-job-excuotr 需要配置 xxl-job-admin 的地址,主动向 xxl-job-admin 注册。
执行器操作
创建 SpringBoot 工程,利用初始化器创建不贴图。
导入依赖:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.0</version> </dependency> </dependencies>
修改配置文件添加 xxl-job 相关配置:
server: port: 8080 xxl: job: admin: addresses: http://localhost:8088/xxl-job-admin/ executor: appName: xxl-job logPath: D:\Log
创建配置类 XxlJobConfig.java:
/** * xxl-job配置 * * @author 30315 * @date 2021/10/04 */ @Component @Slf4j public class XxlJobConfig { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.executor.appName}") private String appName; @Value("${xxl.job.executor.logPath}") private String logPath; @Bean public XxlJobSpringExecutor xxlJobExecutor() { XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appName); xxlJobSpringExecutor.setLogPath(logPath); return xxlJobSpringExecutor; } }
在调度中心创建执行器与任务,在执行器管理当中创建执行器:
执行器的名称要和配置文件当中的 appName 名称一样:
然后在切换到任务管理,创建任务:
在程序当中创建任务:
/** * @author BNTang * @version 1.0 * @project xxl-job-project * @description * @since Created in 2021/10/4 004 20:52 **/ @Component @Slf4j public class MyJob { @XxlJob("myJobHandler") public ReturnT<String> execute() { log.info("myJobHandler execute!"); XxlJobHelper.log("myJobHandler execute!"); return ReturnT.SUCCESS; } }
启动任务,当然在启动任务的同时程序也要启动起来:
运行效果需要根据你任务的 Cron 表达式的时间来定啦,我这里不贴图你自行测试。
路径策略
就是一个负载均衡,当只有一个任务调度中心,有多个执行器的时候,任务调度中心要选择一个或多个执行器来执行任务。
对项目进行集群部署:
修改配置文件当中的端口号然后在启动一个项目即可搭建多个服务的集群,效果如下:
策略详解
一般使用 故障转移
,当一台机器出现故障时,转移到另一台机器上,这个故障转移的效果需要自己搭建一个集群的服务,然后,你把集群当中正在运行任务的服务关闭之后
xxl-job 会自行的进行转移到另外一台服务器上进行运行任务。
分片策略:
- XxlJobHelper.getShardIndex(); 获取当前调度器的序号
- XxlJobHelper.getShardTotal(); 获取总机器数量
修改 MyJob.java
/** * @author BNTang */ @Component @Slf4j public class MyJob { @XxlJob("myJobHandler") public ReturnT<String> execute() { List<Integer> users = Arrays.asList(1, 2, 3, 4, 5, 6); users.forEach(u -> { if (u % XxlJobHelper.getShardTotal() == XxlJobHelper.getShardIndex()) { log.info("发短信操作 >>> user={},index={},total={}", u, XxlJobHelper.getShardIndex(), XxlJobHelper.getShardTotal()); } }); return ReturnT.SUCCESS; } }
阻塞处理策略
- 单机串行
- 丢弃后续调度
- 覆盖之前调度
获取任务传递的,任务参数,代码如下:
/** * @author BNTang */ @Component @Slf4j public class MyJob { @XxlJob("myJobHandler") public ReturnT<String> execute() { log.info("myJobHandler execute..."); log.info("参数={}", XxlJobHelper.getJobParam()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } XxlJobHelper.log("myJobHandler execute..."); return new ReturnT<>(200, "【执行成功】" + new Date()); } }