Xxljob调度机制

简介: 时间轮算法,其实很简单,就是用实际的时钟刻度槽位来存储任务。时钟刻度可以更细致,比如把一天切分成246060个秒的刻度,秒的刻度上挂任务。

XxlJobScheduler

XxlJob调度类核心为:

com.xxl.job.admin.core.scheduler.XxlJobScheduler

由配置类:XxlJobAdminConfig 实例化并调用初始化方法 init 。初始化方法中,会启动各种监控线程,其中就包括调度线程。

JobScheduleHelper.getInstance().start();

JobScheduleHelper

Xxljob具体调度过程如下图所示
20230213-Xxljob调度机制-流程图.jpg

定时线程-scheduleThread

1、时间校对后5s
也许是为了:保证时间的准确度,一般sleep 4.* s ;以便后面的内容准点运行。

try {
    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
    if (!scheduleThreadToStop) {
        logger.error(e.getMessage(), e);
    }
}

2、预读取数量:preReadCount
相关配置参数为:

## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

preReadCount计算公式为:

preReadCount = (FastMax + SlowMax) * 20

3、守护线程的循环
只要守护线程停止标志 : scheduleThreadToStop 为false,循环一直运行。守护线程自己内部的异常或者,主动调用toStop方法,可以将标志设置为true。
4、加锁
对表 xxl_job_lock 加锁,开始一个事务。

select * from xxl_job_lock where lock_name = 'schedule_lock' for update

这也是xxl-job的一个缺陷,集群部署保证高可用;但是锁的原因,却无法扩展调度能力。

5、预读取任务
查询表xxl_job_info,预读取当前批次需要执行的任务。以当前ms时间 nowTime 为基准,未来5s需要执行的任务,按批次preReadCount读取到内存中。
其中涉及到两个参数:
● pagesize 就是preReadCount,表示一个批次读取多少任务到内存中
● maxNextTime 表示最大的调度时间。表xxl_job_info中,始终维护着一个字段 trigger_next_time 也就是该任务的下次调度时间 。
如果读取到任务列表为空或者长度为0,设置一个预读取标志为false;该标志默认为true。

6、逐一调度任务
(1)任务过期大于5s
如果当前时间 大于 当前任务的下一次调度时间 + 5s,即任务过期5s。
日志告警任务失效,并更新任务的下一次调度时间。
也可以设置任务过期策略,也可以让任务尝试执行。相关字段: xxl_job_info.misfire_strategy 。
(2)任务过期小于5s
如果当前时间 大于 当前任务的下一次调度时间,即当前任务过期时间小于5s。
生成任务线程,放入任务线程池。并更新任务的下一次调度时间。
(3)任务未过期
将任务通过ringData,扔给时间轮线程处理。

7、批量更新任务
这里主要是根据任务ID,更新任务的下一次调度时间。

8、批次收尾
关闭连接资源,释放锁。(第4步加锁的)

9、准备下一轮调度
如果本次调度时间小于1s,需要让守护线程sleep一段时间之后,再进入下一轮调度。
这个时候,调度标志 preReadSuc 就起作用了。
preReadSuc=true 本轮有调度任务,每秒都扫描。
preReadSuc=false 本轮没有调度任务,等待5s进入一下轮调度。

时间轮线程-ringThread

时间轮 ringThread 也是一个守护线程。字段 ringThreadToStop 控制时间轮线程是否继续。
1、时间校对下一秒

try {
    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
    if (!ringThreadToStop) {
        logger.error(e.getMessage(), e);
    }
}

2、当前时间轮获取任务
获取当前时间的秒

int nowSecond = Calendar.getInstance().get(Calendar.SECOND);

避免处理耗时太长,跨过刻度,向前校验一个刻度;

for (int i = 0; i < 2; i++) {
    List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
    if (tmpData != null) {
        ringItemData.addAll(tmpData);
    }
}

注意:
(1)循环两次,表示获取nowSecond 和 nowSecond - 1 两秒内的任务ID。
(2)写法 (nowSecond+60-i)%60 是为了保证时间秒在 0 ~ 59 之间。
(3)任务获取,并从容器中清除;避免重复执行。
3、逐一调度任务
生成任务线程,放入任务线程池。

执行线程池-fastTriggerPool

执行线程池中的任务,主要经历一下执行步骤。
1、根据jobId查询Job的信息。
2、根据Job信息中的执行器ID,查询执行器信息。
3、重试机制处理。
4、任务起始日志,插入数据库。
5、调度任务触发执行器。
6、任务结束日志,更新数据库。

相关文章
|
存储 Java BI
XXL-JOB定时任务知识点和应用实例
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。该处只是介绍xxl_job的一下基础知识和使用的实例,具体的安装调试请参照对应的最新的官方文档,中文开源地址:https://www.xuxueli.com/xxl-job
4214 0
|
监控 Java 调度
分布式系列教程(17) - 分布式任务调度平台XXL-JOB
分布式系列教程(17) - 分布式任务调度平台XXL-JOB
611 0
|
存储 监控 Java
10分钟3个步骤集成使用SkyWalking
此时就非常推荐SkyWalking了,SkyWalking不仅仅是一款链路跟踪工具,还可以作为一个系统监控工具,还具有告警功能。使用简便、上手又快。真可谓快、准、狠。
10分钟3个步骤集成使用SkyWalking
|
消息中间件 缓存 监控
Flink背压原理以及解决优化
Flink背压原理以及解决优化
1021 0
|
6月前
|
存储 关系型数据库 MySQL
阿里面试:MySQL 一个表最多 加几个索引? 6个?64个?还是多少?
阿里面试:MySQL 一个表最多 加几个索引? 6个?64个?还是多少?
阿里面试:MySQL 一个表最多 加几个索引? 6个?64个?还是多少?
|
消息中间件 数据可视化 NoSQL
XXL-Job搭建(传统方式&Docker方式)与使用(Linux环境下)
XXL-Job搭建(传统方式&Docker方式)与使用(Linux环境下)
6898 0
XXL-Job搭建(传统方式&Docker方式)与使用(Linux环境下)
|
8月前
|
存储 监控 Java
G1原理—7.G1的GC日志分析解读
本文进行了TLAB的GC日志解读、YGC的GC日志解读、模拟YGC(单次GC及多次GC的不同场景)、打开实验选项查看YGC的详情日志信息、Mixed GC日志信息之初始标记过程、Mixed GC日志信息之混合回收过程。
|
10月前
|
Java 中间件 调度
SpringBoot整合XXL-JOB【03】- 执行器的使用
本文介绍了如何将调度中心与项目结合,通过配置“执行器”实现定时任务控制。首先新建SpringBoot项目并引入依赖,接着配置xxl-job相关参数,如调度中心地址、执行器名称等。然后通过Java代码将执行器注册为Spring Bean,并声明测试方法使用`@XxlJob`注解。最后,在调度中心配置并启动定时任务,验证任务是否按预期执行。通过这些步骤,读者可以掌握Xxl-Job的基本使用,专注于业务逻辑的编写而无需关心定时器本身的实现。
2385 10
SpringBoot整合XXL-JOB【03】-  执行器的使用
|
负载均衡 Java 调度
xxl-job与其他调度框架比较与部署
xxl-job与其他调度框架比较与部署
xxl-job与其他调度框架比较与部署
|
人工智能 分布式计算 Java
说说XXLJob分片任务实现原理?
说说XXLJob分片任务实现原理?
1859 0
说说XXLJob分片任务实现原理?