Xxljob调度机制

简介: 时间轮算法,其实很简单,就是用实际的时钟刻度槽位来存储任务。时钟刻度可以更细致,比如把一天切分成246060个秒的刻度,秒的刻度上挂任务。

XxlJobScheduler

XxlJob调度类核心为:

com.xxl.job.admin.core.scheduler.XxlJobScheduler

由配置类:XxlJobAdminConfig 实例化并调用初始化方法 init 。初始化方法中,会启动各种监控线程,其中就包括调度线程。

JobScheduleHelper.getInstance().start();

JobScheduleHelper

Xxljob具体调度过程如下图所示
20230213-Xxljob调度机制-流程图.jpg

定时线程-scheduleThread

1、时间校对后5s
也许是为了:保证时间的准确度,一般sleep 4.* s ;以便后面的内容准点运行。

try {
    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
    if (!scheduleThreadToStop) {
        logger.error(e.getMessage(), e);
    }
}

2、预读取数量:preReadCount
相关配置参数为:

## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

preReadCount计算公式为:

preReadCount = (FastMax + SlowMax) * 20

3、守护线程的循环
只要守护线程停止标志 : scheduleThreadToStop 为false,循环一直运行。守护线程自己内部的异常或者,主动调用toStop方法,可以将标志设置为true。
4、加锁
对表 xxl_job_lock 加锁,开始一个事务。

select * from xxl_job_lock where lock_name = 'schedule_lock' for update

这也是xxl-job的一个缺陷,集群部署保证高可用;但是锁的原因,却无法扩展调度能力。

5、预读取任务
查询表xxl_job_info,预读取当前批次需要执行的任务。以当前ms时间 nowTime 为基准,未来5s需要执行的任务,按批次preReadCount读取到内存中。
其中涉及到两个参数:
● pagesize 就是preReadCount,表示一个批次读取多少任务到内存中
● maxNextTime 表示最大的调度时间。表xxl_job_info中,始终维护着一个字段 trigger_next_time 也就是该任务的下次调度时间 。
如果读取到任务列表为空或者长度为0,设置一个预读取标志为false;该标志默认为true。

6、逐一调度任务
(1)任务过期大于5s
如果当前时间 大于 当前任务的下一次调度时间 + 5s,即任务过期5s。
日志告警任务失效,并更新任务的下一次调度时间。
也可以设置任务过期策略,也可以让任务尝试执行。相关字段: xxl_job_info.misfire_strategy 。
(2)任务过期小于5s
如果当前时间 大于 当前任务的下一次调度时间,即当前任务过期时间小于5s。
生成任务线程,放入任务线程池。并更新任务的下一次调度时间。
(3)任务未过期
将任务通过ringData,扔给时间轮线程处理。

7、批量更新任务
这里主要是根据任务ID,更新任务的下一次调度时间。

8、批次收尾
关闭连接资源,释放锁。(第4步加锁的)

9、准备下一轮调度
如果本次调度时间小于1s,需要让守护线程sleep一段时间之后,再进入下一轮调度。
这个时候,调度标志 preReadSuc 就起作用了。
preReadSuc=true 本轮有调度任务,每秒都扫描。
preReadSuc=false 本轮没有调度任务,等待5s进入一下轮调度。

时间轮线程-ringThread

时间轮 ringThread 也是一个守护线程。字段 ringThreadToStop 控制时间轮线程是否继续。
1、时间校对下一秒

try {
    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
    if (!ringThreadToStop) {
        logger.error(e.getMessage(), e);
    }
}

2、当前时间轮获取任务
获取当前时间的秒

int nowSecond = Calendar.getInstance().get(Calendar.SECOND);

避免处理耗时太长,跨过刻度,向前校验一个刻度;

for (int i = 0; i < 2; i++) {
    List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
    if (tmpData != null) {
        ringItemData.addAll(tmpData);
    }
}

注意:
(1)循环两次,表示获取nowSecond 和 nowSecond - 1 两秒内的任务ID。
(2)写法 (nowSecond+60-i)%60 是为了保证时间秒在 0 ~ 59 之间。
(3)任务获取,并从容器中清除;避免重复执行。
3、逐一调度任务
生成任务线程,放入任务线程池。

执行线程池-fastTriggerPool

执行线程池中的任务,主要经历一下执行步骤。
1、根据jobId查询Job的信息。
2、根据Job信息中的执行器ID,查询执行器信息。
3、重试机制处理。
4、任务起始日志,插入数据库。
5、调度任务触发执行器。
6、任务结束日志,更新数据库。

相关文章
|
2月前
|
Java 调度
利用 XXL-JOB 实现灵活控制的分片处理
本文讲述了一种利用 XXL-JOB 来进行分片任务处理的方法,另外加入对执行节点数的灵活控制。
47 2
|
运维 监控 算法
从定时任务-到任务调度系统xxl-job
定时任务的今生前世以及xxl-job调度系统
2501 0
从定时任务-到任务调度系统xxl-job
|
4月前
|
存储 监控 算法
XXL-JOB内部机制大揭秘:让任务调度飞起来
【8月更文挑战第14天】在大数据时代,高效的任务调度系统是支撑业务稳定运行与快速迭代的基石。XXL-JOB,作为一款轻量级、分布式任务调度平台,凭借其灵活的配置、强大的扩展性和高可用特性,在众多任务调度框架中脱颖而出。今天,我们就来深入揭秘XXL-JOB的内部机制,看看它是如何让任务调度“飞起来”的。
287 0
|
7月前
|
负载均衡 Java 调度
xxl-job与其他调度框架比较与部署
xxl-job与其他调度框架比较与部署
xxl-job与其他调度框架比较与部署
|
7月前
|
Java 应用服务中间件 调度
xxl-job任务调度2.0.2升级到2.3.0版本,执行器改造过程中经验总结
xxl-job任务调度2.0.2升级到2.3.0版本,执行器改造过程中经验总结
586 0
|
运维 监控 Java
分布式任务处理:XXL-JOB分布式任务调度框架(一)
分布式任务处理:XXL-JOB分布式任务调度框架
334 0
|
Java 调度 Maven
分布式任务处理:XXL-JOB分布式任务调度框架(二)
分布式任务处理:XXL-JOB分布式任务调度框架
308 0
|
缓存 算法 Java
分布式任务处理:XXL-JOB分布式任务调度框架(三)
分布式任务处理:XXL-JOB分布式任务调度框架
837 0
|
调度
|
Java 数据库连接 API
一文搞懂Executor执行器和线程池的关系,整体介绍其任务执行/调度体系:ThreadPoolExecutor、ScheduledExecutorService(上)
一文搞懂Executor执行器和线程池的关系,整体介绍其任务执行/调度体系:ThreadPoolExecutor、ScheduledExecutorService(上)
一文搞懂Executor执行器和线程池的关系,整体介绍其任务执行/调度体系:ThreadPoolExecutor、ScheduledExecutorService(上)

相关实验场景

更多