从定时任务-到任务调度系统
定时任务在我们开发中特别的常见,比如凌晨备份数据、统计数据,或者自动取消未支付的订单等都需要借助定时任务来完成。
定时任务的今生前世
Top 1:线程任务
class MyTimerThread extends Thread {
@Override
public void run() {
while(true) {
try {
Thread.sleep(60*1000);
//每隔1分钟需要执行的任务
doTask();
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
线程任务的缺点
如果定时任务多的话,没有办法分配CPU占比,只能由系统自动分配,因此有可能会在某段时间因为定时任务频繁抢占CPU而导致主线程阻塞.用线程池的话可以确保主线程能够抢到CPU,其他定时任务只占用空闲的CPU.不同的服务器CPU性能不一样,直接调节线程池相关参数就可以灵活调整CPU配比。
Top 2:Timer
Timer 是 JDK 自带的定时任务执行类, jdk1.3以后
存在schedule和scheduleAtFixedRate两套不同调度算法的方法, 它们的共同点是若判断理论执行时间小于实际执行时间时,都会马上执行任务,区别在于计算下一次执行时间的方式不同:
schedule: 任务开始的时间 + period(时间片段),强调“固定间隔”地执行任务
scheduleAtFixedRate: 参数设定开始的时间 + period(时间片段),强调“固定频率”地执行任务
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("Run timerTask:" + new Date());
}
};
// 计时器
Timer timer = new Timer();
// 添加执行任务(延迟 1s 执行,每 3s 执行一次)
timer.schedule(timerTask, 1000, 3000);
程序执行结果如下:
Run timerTask:Mon Jul 19 23:09:16 CST 2021
Run timerTask:Mon Jul 19 23:09:19 CST 2021
Run timerTask:Mon Jul 19 23:09:22 CST 2021
Timer 缺点分析
问题1:当一个任务的执行时间过长时,会影响其他任务的调度
如下代码所示:
// 定义任务 1
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("进入 timerTask 1:" + new Date());
try {
// 休眠 5 秒
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Run timerTask 1:" + new Date());
}
};
// 定义任务 2
TimerTask timerTask2 = new TimerTask() {
@Override
public void run() {
System.out.println("Run timerTask 2:" + new Date());
}
};
// 计时器
Timer timer = new Timer();
// 添加执行任务(延迟 1s 执行,每 3s 执行一次)
timer.schedule(timerTask, 1000, 3000);
timer.schedule(timerTask2, 1000, 3000);
程序执行结果如下:
进入 timerTask 1:Mon Aug 17 21:44:08 CST 2020Run timerTask 1:Mon Aug 17 21:44:13 CST 2020
Run timerTask 2:Mon Aug 17 21:44:13 CST 2020
进入 timerTask 1:Mon Aug 17 21:44:13 CST 2020
Run timerTask 1:Mon Aug 17 21:44:18 CST 2020
进入 timerTask 1:Mon Aug 17 21:44:18 CST 2020
Run timerTask 1:Mon Aug 17 21:44:23 CST 2020
Run timerTask 2:Mon Aug 17 21:44:23 CST 2020
进入 timerTask 1:Mon Aug 17 21:44:23 CST 2020
从上述结果中可以看出,当任务 1 运行时间超过设定的间隔时间时,任务 2 也会延迟执行。 原本任务 1 和任务 2 的执行时间间隔都是 3s,但因为任务 1 执行了 5s,因此任务 2 的执行时间间隔也变成了 10s(和原定时间不符)。
问题2:使用 Timer 类实现定时任务时,当一个任务抛出异常,其他任务也会终止运行
如下代码所示:
// 定义任务 1
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("进入 timerTask 1:" + new Date());
// 模拟异常
int num = 8 / 0;
System.out.println("Run timerTask 1:" + new Date());
}
};
// 定义任务 2
TimerTask timerTask2 = new TimerTask() {
@Override
public void run() {
System.out.println("Run timerTask 2:" + new Date());
}
};
// 计时器
Timer timer = new Timer();
// 添加执行任务(延迟 1s 执行,每 3s 执行一次)
timer.schedule(timerTask, 1000, 3000);
timer.schedule(timerTask2, 1000, 3000);
程序执行结果如下:
进入 timerTask 1:Mon Aug 17 22:02:37 CST 2020Exception in thread "Timer-0" java.lang.ArithmeticException: / by zero
at com.example.MyTimerTask$1.run(MyTimerTask.java:21)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
Process finished with exit code 0
为什么会出现这个情况呢,我们查看源码:
public class Timer {
// 任务队列
private final TaskQueue queue = new TaskQueue();
// 处理线程
private final TimerThread thread = new TimerThread(queue);
Timer被设计成支持多个定时任务,通过源码发现它有一个任务队列用来存放这些定时任务,并且启动了一个线程来处理,单线程要处理多任务的时候就会出现问题,
- 若任务B执行时间过长,将导致任务A延迟了启动时间!
- 若任务线程在执行队列中某个任务时,该任务抛出异常,将导致线程因跳出循环体而终止,即Timer停止了工作!
Timer 小结
Timer 类实现定时任务的优点是方便,因为它是 JDK 自定的定时任务,但缺点是任务如果执行时间太长或者是任务执行异常,会影响其他任务调度,所以在生产环境下建议谨慎使用。
TOP 3:ScheduledExecutorService
ScheduledExecutorService 也是jdk自带的出自于1.5以后,可以实现 Timer 类具备的所有功能,并且它可以解决了 Timer 类存在的所有问题
基本使用:
// 创建任务队列
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
// 10 为线程数量
// 执行任务
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("Run Schedule:" + new Date());
}, 1, 3, TimeUnit.SECONDS);
// 1s 后开始执行,每 3s 执行一次
之前出现的两种异常就不再演示了,使用ScheduledExecutorService 来执行任务,不会造成任务间的相互影响。
为什么不会任务间不会互相影响了呢?
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以本质上说ScheduledThreadPoolExecutor还是一个线程池,特殊的地方在于它实现了自己的工作队列DelayedWorkQueue,该任务队列的作用是按照一定顺序对队列中的任务进行排序。比如,按照距离下次执行时间的长短的升序方式排列,让需要尽快执行的任务排在队首,“不那么着急”的任务排在队列后方,从而方便线程获取到“应该”被执行的任务。除此之外,ScheduledThreadPoolExecutor还在任务执行结束后,计算出下次执行的时间,重新放到工作队列中,等待下次调用。
任务如果出现异常就停止了调度不会影响到其他任务的调度。
那要是将corePoolSize设置为1,变成单线程跑呢?
结果当然是和Timer一样,任务B会导致任务A延迟执行,不过比较好的是任务C抛异常不会影响到其他任务的调度。
TOP 4:Spring Task
使用上面两种定时任务的实现方式,很难实现设定了具体时间的定时任务,比如当我们需要每周五来执行某项任务时
Spring Task就可轻松的实现此需求。
以 Spring Boot 为例,实现定时任务只需两步:
- 开启定时任务。
- 添加定时任务。
在Application上添加注解@EnableScheduling
// 添加定时任务
// cron 表达式,每周五 23:59:59 执行
@Scheduled(cron = "59 59 23 0 0 5")
public void doTask(){
System.out.println("我是定时任务~");
}
Spring Task 是否会遇到之前出现的两种问题呢?
答案是会的,SpringBoot使用@scheduled定时执行任务的时候是在一个单线程中,如果有多个任务,其中一个任务执行时间过长,则有可能会导致其他后续任务被阻塞直到该任务执行完成。
@Scheduled(cron = "0/1 * * * * ? ")
public void deleteFile() throws InterruptedException {
log.info("111delete success, time:" + new Date().toString());
Thread.sleep(1000 * 5);//模拟长时间执行,比如IO操作,http请求
}
@Scheduled(cron = "0/1 * * * * ? ")
public void syncFile() {
log.info("222sync success, time:" + new Date().toString());
}
@Scheduled(cron = "0/1 * * * * ? ")
public void doTask() {
log.info("333sync success, time:" + new Date().toString());
throw new RuntimeException("异常 出错了");
}
/**输出如下:
[pool-1-thread-1] : 111delete success, time:Mon Nov 26 20:42:13 CST 2018
[pool-1-thread-1] : 222sync success, time:Mon Nov 26 20:42:18 CST 2018
[pool-1-thread-1] : 111delete success, time:Mon Nov 26 20:42:19 CST 2018
[pool-1-thread-1] : 222sync success, time:Mon Nov 26 20:42:24 CST 2018
[pool-1-thread-1] : 222sync success, time:Mon Nov 26 20:42:25 CST 2018
[pool-1-thread-1] : 111delete success, time:Mon Nov 26 20:42:25 CST 2018
上面的日志中可以明显的看到syncFile被阻塞了,直达deleteFile执行完它才执行了
而且从日志信息中也可以看出@Scheduled是使用了一个线程池中的一个单线程来执行所有任务的。
**/
/**如果把Thread.sleep(1000*5)注释了,输出如下:
[pool-1-thread-1]: 111delete success, time:Mon Nov 26 20:48:04 CST 2018
[pool-1-thread-1]: 222sync success, time:Mon Nov 26 20:48:04 CST 2018
[pool-1-thread-1]: 222sync success, time:Mon Nov 26 20:48:05 CST 2018
[pool-1-thread-1]: 111delete success, time:Mon Nov 26 20:48:05 CST 2018
[pool-1-thread-1]: 111delete success, time:Mon Nov 26 20:48:06 CST 2018
[pool-1-thread-1]: 222sync success, time:Mon Nov 26 20:48:06 CST 2018
这下正常了
**/
这里定时任务出现异常并不会终止后续的任务进行,相比Timer 而言就好很多了。
解决办法:
1.将@Scheduled注释的方法内部改成异步执行
如下:
//构建一个合理的线程池也是一个关键,否则提交的任务也会在自己构建的线程池中阻塞
ExecutorService service = Executors.newFixedThreadPool(5);
@Scheduled(cron = "0/1 * * * * ? ")
public void deleteFile() {
service.execute(() -> {
log.info("111delete success, time:" + new Date().toString());
try {
Thread.sleep(1000 * 5);//改成异步执行后,就算你再耗时也不会印象到后续任务的定时调度了
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
@Scheduled(cron = "0/1 * * * * ? ")
public void syncFile() {
service.execute(()->{
log.info("222sync success, time:" + new Date().toString());
});
}
2.把Scheduled配置成成多线程执行
@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
//当然了,这里设置的线程池是corePoolSize也是很关键了,自己根据业务需求设定
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
/**为什么这么说呢?
假设你有4个任务需要每隔1秒执行,而其中三个都是比较耗时的操作可能需要10多秒,而你上面的语句是这样写的:
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(3));
那么仍然可能导致最后一个任务被阻塞不能定时执行
**/
}
}
xxl-job 任务调度系统
基本介绍
项目开发中,常常以下场景需要分布式任务调度:
- 同一服务多个实例的任务存在互斥时,需要统一协调
- 定时任务的执行需要支持高可用、监控运维、故障告警
- 需要统一管理和追踪各个服务节点定时任务的运行情况,以及任务属性信息,例如任务所属服务、所属责任人
因此,XXL-JOB应运而生: XXL-JOB是一个开源的轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展、开箱即用,其中“XXL”是主要作者,大众点评许雪里名字的缩写
自2015年开源以来,已接入数百家公司的线上产品线,接入场景涉及电商业务,O2O业务和大数据作业等
功能特性
主要功能特性如下:
- 简单灵活 提供Web页面对任务进行管理,管理系统支持用户管理、权限控制; 支持容器部署; 支持通过通用HTTP提供跨平台任务调度;
- 丰富的任务管理功能 支持页面对任务CRUD操作; 支持在页面编写脚本任务、命令行任务、Java代码任务并执行; 支持任务级联编排,父任务执行结束后触发子任务执行; 支持设置任务优先级; 支持设置指定任务执行节点路由策略,包括轮询、随机、广播、故障转移、忙碌转移等; 支持Cron方式、任务依赖、调度中心API接口方式触发任务执行
- 高性能 调度中心基于线程池多线程触发调度任务,快任务、慢任务基于线程池隔离调度,提供系统性能和稳定性; 任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰;
- 高可用 任务调度中心、任务执行节点均 集群部署,支持动态扩展、故障转移 支持任务配置路由故障转移策略,执行器节点不可用是自动转移到其他节点执行 支持任务超时控制、失败重试配置 支持任务处理阻塞策略:调度当任务执行节点忙碌时来不及执行任务的处理策略,包括:串行、抛弃、覆盖策略
- 易于监控运维 支持设置任务失败邮件告警,预留接口支持短信、钉钉告警; 支持实时查看任务执行运行数据统计图表、任务进度监控数据、任务完整执行日志;
系统设计
1 设计思路
将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求; 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑; 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
2 系统组成
- 调度模块(调度中心): 负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块; 支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover
- 执行模块(执行器): 负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效; 接收“调度中心”的执行请求、终止请求和日志请求等
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i5St9Ifg-1628246352610)(/Users/jesuscc/Jesuscc/Img/learn/xxj-job架构图.png)]
3 工作原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4FI1fUIJ-1628246352612)(/Users/jesuscc/Jesuscc/Img/learn/xxl-job任务执行流程.png)]
- 任务执行器根据配置的调度中心的地址,自动注册到调度中心
- 达到任务触发条件,调度中心下发任务
- 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
- 执行器的回调线程消费内存队列中的执行结果,主动上报给调度中心
- 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
4 HA设计
4.1 调度中心高可用
调度中心支持多节点部署,基于数据库行锁保证同时只有一个调度中心节点触发任务调度,参考com.xxl.job.admin.core.thread.JobScheduleHelper#start
Connection conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
# 触发任务调度
# 事务提交
conn.commit();
复制代码
4.2 任务调度高可用
- 路由策略 调度中心基于路由策略路由选择一个执行器节点执行任务,XXL-JOB提供了如下路由策略保证任务调度高可用: 忙碌转移策略: 下发任务前向执行器节点发起rpc心跳请求查询是否忙碌,如果执行器节点返回忙碌则转移到其他执行器节点执行(参考 com.xxl.job.admin.core.route.strategy.ExecutorRouteBusyover) 故障转移策略: 下发任务前向执行器节点发起rpc心跳请求查询是否在线,如果执行器节点没返回或者返回不可用则转移到其他执行器节点执行 (参考com.xxl.job.admin.core.route.strategy.ExecutorRouteFailover)
- 阻塞处理策略 当执行器节点存在多个相同任务id的任务未执行完成,则需要基于阻塞策略对任务进行取舍: 串行策略:默认策略,任务进行排队、丢弃旧任务策略、丢弃新任务策略 (参考:com.xxl.job.core.biz.impl.ExecutorBizImpl#run)
特性 | quartz | elastic-job-lite | xxl-job | LTS |
---|---|---|---|---|
依赖 | MySQL、jdk | jdk、zookeeper | mysql、jdk | jdk、zookeeper、maven |
高可用 | 多节点部署,通过竞争数据库锁来保证只有一个节点执行任务 | 通过zookeeper的注册与发现,可以动态的添加服务器 | 基于竞争数据库锁保证只有一个节点执行任务,支持水平扩容。可以手动增加定时任务,启动和暂停任务,有监控 | 集群部署,可以动态的添加服务器。可以手动增加定时任务,启动和暂停任务。有监控 |
任务分片 | × | √ | √ | √ |
管理界面 | × | √ | √ | √ |
难易程度 | 简单 | 简单 | 简单 | 略复杂 |
高级功能 | - | 弹性扩容,多种作业模式,失效转移,运行状态收集,多线程处理数据,幂等性,容错处理,spring命名空间支持 | 弹性扩容,分片广播,故障转移,Rolling实时日志,GLUE(支持在线编辑代码,免发布),任务进度监控,任务依赖,数据加密,邮件报警,运行报表,国际化 | 支持spring,spring boot,业务日志记录器,SPI扩展支持,故障转移,节点监控,多样化任务执行结果支持,FailStore容错,动态扩容。 |
5.源码浅析
设计思想:xxl-job通过在调度中心操作各个执行器的调度方法,使用http作为rpc,调度执行执行器的业务,从而使得业务和调度相对隔离。
- 调度中心启动过程?
- 执行器启动过程?
- 执行器如何注册到调度中心?
- 调度中心怎么调用执行器?
5.1 调度中心启动过程?
XxlJobAdminConfig.java
// 1. 加载 adminConfig = this
// xxlJobScheduler = new XxlJobScheduler();
// xxlJobScheduler.init();
// 启动过程代码
public class XxlJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
public void init() throws Exception {
// init i18n
// 1.初始化国际化
initI18n();
// admin trigger pool start
// 2.启动触发器线程池
JobTriggerPoolHelper.toStart();
// admin registry monitor run
// 3.启动注册监控器(将注册到register表中的IP加载到group表)/ 30s执行一次
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
// 4. 启动失败日志监控器(失败重试,失败邮件发送)
// 4.1 查找失败日志
// 4.2 重试调度
// 4.3 发送失败邮件
// 4.4 更新数据库
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
// 5.启动任务丢失监控( 任务处于"运行中"超过十分钟且且对应执行器心跳注册失败不在线)
JobCompleteHelper.getInstance().start();
// admin log report start
// 6.启动日志报告监控
// 6.1统计每日调度结果(处理3天内的成功与否的日志)首页的统计图 / 1min执行一次
// 6.2清理日志(数据库) 配置文件中xxl.job.logretentiondays设置的天数
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
// 7. 启动定时任务调度器(执行任务,缓存任务)
// 7.1 计划线程[缓存任务] (如果执行成功则每秒都执行,执行失败则跳过当前秒)
// 7.1.1 查询5s内要运行的定时任务
// ¬ ①如果已经超过设定时间5秒了,执行过期策略并计算下次运行时间
// ¬ ②如果时间到了且没有超过五秒,则直接开启任务并计算下次运行时间
// 如果当前任务还在运行中,且下次的触发时间不足5s,则将任务放入一个队列中,计算下次触发时间
// ¬ ③时间还没到的任务 5-10秒内的 将任务放入队列 计算下次触发时间
// 7.2 执行线程[执行任务] (1s执行一次)
// 7.2.1 查找当前秒和前一秒 队列里的任务执行触发器(避免处理耗时太长,跨过刻度)
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
... ...
}
5.2 执行器启动过程?
XxlJobConfig.java
//初始化 XxlJobSpringExecutor.java
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
//初始化定时任务方法仓库
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
//扫描初始化执行器的方法,并存储在ConcurrentMap里面,方便调用
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
//获取所有注入到spring的bean的类
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
//获取这个bean下 所有带有@XxlJob方法
Map<Method, XxlJob> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
}
XxlJobExecutor开始和调度中心进行注册,包含一些端口等信息,同时开启一些线程,进行其他业务等。
// XxlJobExecutor.class
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
// 将多个调度中心地址存储在list里,后面会用到
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
// 启动日志文件清理线程 (一天清理一次)
// 任务日志文件保留时间 对应配置文件 xxl.job.executor.logretentiondays
// 每天清理一次过期日志,配置参数必须大于3才有效
//清理的流程是 起一个新的线程 清理目录,根据名字日期格式化来判断
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
// 启动一个守护线程 如果回调的队列里有数据 就执行结果异步上报
// 上报采用的是 根据step2 设置的地址 通过http的形式来请求调度中心的接口
// 调度中心的接口功能:存储执行器结果日志
TriggerCallbackThread.getInstance().start();
// init executor-server
//初始化netty,绑定端口,
// 注册到调度中心,采用netty进行业务http交互,监控调度中心的调度请求,并分配到特定的执行通道进行执行 30秒一次
initEmbedServer(address, ip, port, appname, accessToken);
}
5.3执行器如何注册到调度中心?
执行器
// 注册执行器入口
XxlJobExecutor.java->start();
//加载调度中心地址
XxlJobExecutor.java->initAdminBizList();
//启动调度的服务
XxlJobExecutor.java ->initEmbedServer();
// 执行注册
ExecutorRegistryThread.java->start();
// RPC 注册代码
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
调度中心
// RPC 注册服务
AdminBizImpl.java->registry();
//操作数据库存储执行器信息数据 存储在数据库xxl_job_registry表中
5.4调度中心怎么调用执行器?
/* 调度中心执行步骤 */
// 1. 调用执行器
XxlJobTrigger.java->runExecutor();
// 2. 获取执行器
XxlJobScheduler.java->getExecutorBiz();
// 3. 调用
ExecutorBizImpl.java->run();
/* 执行器执行步骤 */
// 1. 执行器接口
ExecutorBiz.java->run();
// 2. 执行器实现
ExecutorBizImpl.java->run();
// 3. 把jobInfo 从 jobThreadRepository (ConcurrentMap) 中获取一个新线程,并开启新线程
XxlJobExecutor.java->registJobThread();
// 4. 保存到当前线程队列
JobThread.java->pushTriggerQueue();
// 5. 执行
JobThread.java->handler.execute(triggerParam.getExecutorParams());
调度中心(Admin)
实现 org.springframework.beans.factory.InitializingBean类,重写 afterPropertiesSet 方法,在初始化bean的时候都会执行该方法
DisposableBean spring停止时执行
结束加载项
- 停止定时任务调度器(中断scheduleThread,中断ringThread)
- 停止触发线程池(JobTriggerPoolHelper)
- 停止注册监控器(registryThread)
- 停止失败日志监控器(monitorThread)
- 停止RPC服务(stopRpcProvider)
手动执行方式
JobInfoController.java
@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam) {
// force cover job param
if (executorParam == null) {
executorParam = "";
}
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam);
return ReturnT.SUCCESS;
}
定时调度策略
调度策略执行图
调度策略源码
JobScheduleHelper.java->start();
路由策略
第一个
固定选择第一个机器
ExecutorRouteFirst.java->route();
最后一个
固定选择最后一个机器
ExecutorRouteLast.java->route();
轮询
随机选择在线的机器
ExecutorRouteRound.java->route();
private static int count(int jobId) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// count++
Integer count = routeCountEachJob.get(jobId);
count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力
routeCountEachJob.put(jobId, count);
return count;
}
随机
随机获取地址列表中的一个
ExecutorRouteRandom.java->route();
一致性HASH
一个job通过hash算法固定使用一台机器,且所有任务均匀散列在不同机器
ExecutorRouteConsistentHash.java->route();
public String hashJob(int jobId, List<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
// 取出键值 >= jobHash
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
最不经常使用
使用频率最低的机器优先被选举
把地址列表加入到内存中,等下次执行时剔除无效的地址,判断地址列表中执行次数最少的地址取出
频率、次数
ExecutorRouteLFU.java->route();
public String route(int jobId, List<String> addressList) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// lfu item init
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
if (lfuItemMap == null) {
lfuItemMap = new HashMap<String, Integer>();
jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖
}
// put new
for (String address: addressList) {
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
// 0-n随机数,包括0不包括n
lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lfuItemMap.remove(delKey);
}
}
/*********************** 优化 START ***********************/
// 优化 remove old部分
Iterator<String> iterable = lfuItemMap.keySet().iterator();
while (iterable.hasNext()) {
String address = iterable.next();
if (!addressList.contains(address)) {
iterable.remove();
}
}
/*********************** 优化 START ***********************/
// load least userd count address
// 从小到大排序
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);
return addressItem.getKey();
}
最近最久未使用
最久未使用的机器优先被选举
用链表的方式存储地址,第一个地址使用后下次该任务过来使用第二个地址,依次类推(PS:有点类似轮询策略)
与轮询策略的区别:
- 轮询策略是第一次随机找一台机器执行,后续执行会将索引加1取余
- 轮询策略依赖 addressList 的顺序,如果这个顺序变了,索引到下一次的机器可能不是期望的顺序
- LRU算法第一次执行会把所有地址加载进来并缓存,从第一个地址开始执行,即使 addressList 地址顺序变了也不影响
次数
ExecutorRouteLRU.java->route();
public String route(int jobId, List<String> addressList) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// init lru
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
/**
* LinkedHashMap
* a、accessOrder:ture=访问顺序排序(get/put时排序);false=插入顺序排期;
* b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
*/
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
jobLRUMap.putIfAbsent(jobId, lruItem);
}
/*********************** 举个例子 START ***********************/
// 如果accessOrder为true的话,则会把访问过的元素放在链表后面,放置顺序是访问的顺序
// 如果accessOrder为flase的话,则按插入顺序来遍历
LinkedHashMap<String, String> lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
jobLRUMap.putIfAbsent(1, lruItem);
lruItem.put("192.168.0.1", "192.168.0.1");
lruItem.put("192.168.0.2", "192.168.0.2");
lruItem.put("192.168.0.3", "192.168.0.3");
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
System.out.println(eldestValue + ": " + lruItem);
eldestKey = lruItem.entrySet().iterator().next().getKey();
eldestValue = lruItem.get(eldestKey);
System.out.println(eldestValue + ": " + lruItem);
// 输出结果:
192.168.0.1: {192.168.0.2=192.168.0.2, 192.168.0.3=192.168.0.3, 192.168.0.1=192.168.0.1}
192.168.0.2: {192.168.0.3=192.168.0.3, 192.168.0.1=192.168.0.1, 192.168.0.2=192.168.0.2}
/*********************** 举个例子 END ***********************/
// put new
for (String address: addressList) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lruItem.remove(delKey);
}
}
// load
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
故障转移
按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
ExecutorRouteFailover.java->route();
忙碌转移
按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
ExecutorRouteBusyover.java->route();
分片广播
广播触发对应集群中所有机器执行一次任务,同时传递分片参数;可根据分片参数开发分片任务
阻塞处理策略
为了解决执行线程因并发问题、执行效率慢、任务多等原因而做的一种线程处理机制,主要包括 串行、丢弃后续调度、覆盖之前调度,一般常用策略是串行机制
ExecutorBlockStrategyEnum.java
SERIAL_EXECUTION("Serial execution"), // 串行
DISCARD_LATER("Discard Later"), // 丢弃后续调度
COVER_EARLY("Cover Early"); // 覆盖之前调度
ExecutorBizImpl.java->run();
// executor block strategy
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
使用
快速上手
具体如何快速上手使用,官方文档:www.xuxueli.com/xxl-job/ 已经介绍得比较详细和清楚,不再赘述
注意事项
- 1 时钟同步问题 调度中心和任务执行器需要时间同步,同步时间误差需要在3分钟内,否则抛出异常 参考:com.xxl.rpc.remoting.provider.XxlRpcProviderFactory#invokeService
if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
return xxlRpcResponse;
}
- 2 时区问题 任务由调度中心触发,按照在调度中心设置任务的cron表达式触发时,需要注意部署调度中心的机器所在的时区,按照该时区定制化cron表达式
- 3 任务执行中服务宕掉问题 调度中心完成任务下发,执行器在执行任务的过程中,如果执行器突然服务宕掉,会导致任务的执行问题在调度中心是执行中,调度中心并不会发起失败重试。即使任务设置了超时时间,执行器宕掉导致导致任务长时间未执行完成,调度中心界面也不会看到任务超时,因为任务超时是由执行器检测的并上报给调度中心的
因此遇到任务长时间未执行完成,可以关注是否发生了执行器突然服务宕掉
- 4 优雅停机问题 执行器执行任务基于线程池异步执行,当需要重启时需要注意线程池中还有未执行完成任务的问题,需要优雅停机,可以直接基于XxlJobExecutor.destroy()优雅停机,注意该方法在v2.0.2之前的版本存在bug导致无法优雅停机,v2.0.2及之后的版本才修复(参考:github.com/xuxueli/xxl…)
- 5 失败重试问题 当执行器节点部分服务不可用,例如节点磁盘损坏,但在调度中心仍然处于在线时,调度中心仍可能基于路由策略(包括故障转移策略)路由到该未下线的节点,并不断重试,不断失败,导致重试次数耗尽。所以路由策略尽量不要采用固定化策略,例如固定第一个、固定最后一个
Xxl-job总结
XXL-JOB上手还是比较简单,项目源码还是比较整洁,容易读懂,学习之后可以更加深入理解分布式系统设计、网络通信、多线程协同处理等知识点,推荐阅读