XxlJobScheduler
XxlJob调度类核心为:
com.xxl.job.admin.core.scheduler.XxlJobScheduler
由配置类:XxlJobAdminConfig 实例化并调用初始化方法 init 。初始化方法中,会启动各种监控线程,其中就包括调度线程。
JobScheduleHelper.getInstance().start();
JobScheduleHelper
Xxljob具体调度过程如下图所示
定时线程-scheduleThread
1、时间校对后5s
也许是为了:保证时间的准确度,一般sleep 4.* s ;以便后面的内容准点运行。
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
2、预读取数量:preReadCount
相关配置参数为:
## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
preReadCount计算公式为:
preReadCount = (FastMax + SlowMax) * 20
3、守护线程的循环
只要守护线程停止标志 : scheduleThreadToStop 为false,循环一直运行。守护线程自己内部的异常或者,主动调用toStop方法,可以将标志设置为true。
4、加锁
对表 xxl_job_lock 加锁,开始一个事务。
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
这也是xxl-job的一个缺陷,集群部署保证高可用;但是锁的原因,却无法扩展调度能力。
5、预读取任务
查询表xxl_job_info,预读取当前批次需要执行的任务。以当前ms时间 nowTime 为基准,未来5s需要执行的任务,按批次preReadCount读取到内存中。
其中涉及到两个参数:
● pagesize 就是preReadCount,表示一个批次读取多少任务到内存中
● maxNextTime 表示最大的调度时间。表xxl_job_info中,始终维护着一个字段 trigger_next_time 也就是该任务的下次调度时间 。
如果读取到任务列表为空或者长度为0,设置一个预读取标志为false;该标志默认为true。
6、逐一调度任务
(1)任务过期大于5s
如果当前时间 大于 当前任务的下一次调度时间 + 5s,即任务过期5s。
日志告警任务失效,并更新任务的下一次调度时间。
也可以设置任务过期策略,也可以让任务尝试执行。相关字段: xxl_job_info.misfire_strategy 。
(2)任务过期小于5s
如果当前时间 大于 当前任务的下一次调度时间,即当前任务过期时间小于5s。
生成任务线程,放入任务线程池。并更新任务的下一次调度时间。
(3)任务未过期
将任务通过ringData,扔给时间轮线程处理。
7、批量更新任务
这里主要是根据任务ID,更新任务的下一次调度时间。
8、批次收尾
关闭连接资源,释放锁。(第4步加锁的)
9、准备下一轮调度
如果本次调度时间小于1s,需要让守护线程sleep一段时间之后,再进入下一轮调度。
这个时候,调度标志 preReadSuc 就起作用了。
preReadSuc=true 本轮有调度任务,每秒都扫描。
preReadSuc=false 本轮没有调度任务,等待5s进入一下轮调度。
时间轮线程-ringThread
时间轮 ringThread 也是一个守护线程。字段 ringThreadToStop 控制时间轮线程是否继续。
1、时间校对下一秒
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
2、当前时间轮获取任务
获取当前时间的秒
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
注意:
(1)循环两次,表示获取nowSecond 和 nowSecond - 1 两秒内的任务ID。
(2)写法 (nowSecond+60-i)%60 是为了保证时间秒在 0 ~ 59 之间。
(3)任务获取,并从容器中清除;避免重复执行。
3、逐一调度任务
生成任务线程,放入任务线程池。
执行线程池-fastTriggerPool
执行线程池中的任务,主要经历一下执行步骤。
1、根据jobId查询Job的信息。
2、根据Job信息中的执行器ID,查询执行器信息。
3、重试机制处理。
4、任务起始日志,插入数据库。
5、调度任务触发执行器。
6、任务结束日志,更新数据库。