絮叨
因为之前用xxl-job 用的也很多了,然后自己也把他的边缘业务代码改换成自己公司合适的用法了,比如说我们的动态定时任务,调用接口啥的,但是对应他的核心源码,一直没有机会梳理一下,固找个时间,一起梳理梳理,但是我想说的是本文争对的是原理的解读,不适合XXL-Job的初学者,我这边默认你已经对于XXl-job至少是有一定的了解,如果要去入门,先去下面的官方文档
架构图
上图是我们要进行源码分析的2.1版本的整体架构图。其分为两大块,调度中心和执行器,我们先分析调度中心,也就是xxl-job-admin这个包的代码。
主流程
XXL-JOB 他是基于SpringBoot的,所以我们的第一步当然是启动Spring的应用程序 XxlJobAdminApplication,然后他里面有个配置类XxlJobAdminConfig 实现了 InitializingBean ,Spring在初始化之后会加载这个Bean,如下图
然后我们来看看 xxlJobScheduler.init();方法,如下图
- 第一步国际化相关。
- 第二步监控相关。
- 第三步失败重试相关。
- 第四步启动admin端服务,接收注册请求等。
- 第五步JobScheduleHelper调度器,死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的
其实上面的步骤还是很多地方需要我们去一个个了解的,但是我们今天就挑些来学习,我们直接看最后一步JobScheduleHelper
JobScheduleHelper
非常重要的一个类,我们来看看他的start()方法
其实很简单,就是启动了2后台线程,这2个线程是死循环的,但是这2个线程确也是很重要的。我们一个个来看
我直接把代码拷贝了过来,然后我们来慢慢说吧
- 死循环内的代码如上图,首先利用for update语句进行获取任务的资格锁定,再去获取未来5秒内即将要执行的任务。
其实这种代码我们撸业务的。谁不是闭着眼就来,哈哈 返回的是一个JobInfo的List,如果是我们写代码会怎么样?第一个当然是判断是否为空,然后遍历去处理每一个对象,还别说xxl-job也是这样的,然后根据对象里面的属性不同做不同的业务逻辑处理,哈哈,是不是觉得开源框架和我们平时写的代码也差不多。
然后根据triggerNextTime这个字段来走不同的分支,一共有3个分支,我们来看看这几个分支的逻辑
- 第一个分支当前任务的触发时间已经超时5秒以上了,不在执行,直接计算下一次触发时间。
网络异常,图片无法展示|
- 第二个分支为触发时间已满足,利用JobTriggerPoolHelper这个类进行任务调度,之后判断下一次执行时间如果在5秒内,进行此任务数据的缓存,处理逻辑与第三个分支一样。
- 我们来看看那个pushTimeRing(ringSecond, jobInfo.getId());
private void pushTimeRing(int ringSecond, int jobId){ // push async ring List<Integer> ringItemData = ringData.get(ringSecond); if (ringItemData == null) { ringItemData = new ArrayList<Integer>(); ringData.put(ringSecond, ringItemData); } ringItemData.add(jobId); logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); } 复制代码
ringData是以0到59的整数为key,以jobId集合为value的Map集合。这个集合数据的处理逻辑,就在我们第二个守护线程ringThread中。
- 我们来看看我们的第二个守护线程吧
// ring thread ringThread = new Thread(new Runnable() { @Override public void run() { // align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } while (!ringThreadToStop) { try { // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } // next second, align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); 复制代码
根据当前秒数刻度和前一个刻度进行时间轮的任务获取,之后和上文一样,利用JobTriggerPoolHelper进行任务调度。
时序图