已包含中文注释的源码压缩包:
xxl-job-master.zip(7.8 MB)分析流程:
xxljob直播.xmind(493 KB)1.架构分析2.执行源码核心代码位置如下:1-initI18n()初始化国际化资源,对应配置文件:resources:i18n2-JobTriggerPoolHelper.toStart()触发器线程池:创建快慢线程池,提升任务执行效率如果一个任务在1min内超过10次提交,就默认是慢任务,为了防止慢任务影响系统吞吐量,将其放在慢线程池中运行快线程池定义:最大线程数=200,等待队列=1000慢线程池定义:最大线程数=100,等待队列=2000addTrigger方法默认选择快线程池,如果任务在1min内超过10次提交,就是慢线程池真正的触发器操作代码3-JobRegistryHelper.getInstance().start()任务注册器线程池:将上线机器刷新(最近90s有心跳),将下线机器移除(90s内没心跳),更新执行组初始化服务上线/下线线程池:将新增的IP添加到组中,将断线的IP及时清理(因为执行器会上下线,xxl-job会保证及时更新当前在线的执行器)初始化心跳检测的线程池:xxl-job会把心跳信息保存在xxl_job_registry,借助心跳检测机制,保证机器下线能够及时清楚,机器上线及时发现4-JobFailMonitorHelper.getInstance().start()重试报警机制:因为任务是异步调用,需要有一个进程持续监听执行结果,主要做的是:读取本地数据,更新数据状态,未达到失败次数就重试,达到失败次数就告警借助mybatis查询本地数据库执行失败、且告警状态=0[未告警]的数据日志ID,读取表:xxl_job_log更新告警状态=-1[锁定,避免多线程并发干扰],0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败基于日志ID取出完整的日志信息,读取表:xxl_job_log基于ID获取失败日志对应的任务信息,读取表:xxl_job_info如果失败任务可充实次数大于0,表示还能重试,就继续重试如果配置了告警信息,就实现报警报警后更新本地告警状态,2或35-JobCompleteHelper.getInstance().start()调度系统调度执行器后,执行器突然宕机,调度系统就可以根据这个线程感知任务到底执行成功还是失败任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;6-JobLogReportHelper.getInstance().start()清理过期日志,没什么需要详细讲解的7-JobScheduleHelper.getInstance().start()从xxl_job_info表中找出当前时间+5s的所有执行器数据,然后根据其调度时间判断立即调度还是加入时间戳 取出当前时间 + 5s内所有待执行的执行器数据(避免循环本身导致错过执行时间)建立数据库链接关闭事务的自动提交悲观锁,锁定数据过期处理策略
可能过期的原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过
从数据库取出当前时间+5s的所有任务当前时间:超过调度时间+5s判断任务执行状态是否是:FIRE_ONCE_NOW,如果是就立马调度一次刷新调度时间,避免下一次再错过当前时间:超过调度时间,但还没超过5s调度一次更新调度时间,避免下一次再错过当前时间:没超过调度时间,把调度时间加入到时间轮中获取调度时间的秒将调度时间作为key,任务id作为value,放进调度任务时间轮的map中(其执行代码在:238行)刷新下一次调度时间(这里就不是new Date()了,而是真实的执行时间)更新每一个任务的最近一次执行时间、下一次执行时间、执行状态 时间轮执行原理
时间轮执行原理:com.xxl.job.admin.core.thread.JobScheduleHelper#ringThread
先延迟0-1秒,以保证加载所有数据获取当前时间对应的秒数避免处理耗时太长,跨过刻度,向前校验一个刻度这里的ringData就是上面场景3中添加进去的数据,当前文件代码:149行 当前秒+1是为了防止时间轮中的任务由于意外没有执行,从而有一个补偿机制调度任务清空已处理数据