1,定时任务管理简介
随着互联网应用的快速普及,开发者们往往会遇到业务逻辑复杂、时间驱动类型业务、数据处理、离线分析等场景,比如整点发送优惠券、按月批量统计报表等,为了减少对核心系统的影响,我们通常会采用定时任务框架来处理。定时任务顾名思义就是预先设定任务执行时间,到点后任务自动被调度执行,下面列出几种常见的定时任务框架并简单介绍其实现原理。
2 Java 原生定时任务调度器
2.1 Timer
2.1.1 简介
Timer是从Java SDK1.3开始提供的最原生定时任务执行解决方案,位于java.util包下,主要包括如下四类角色:
- Timer:任务调度器
- TimerThread:任务执行器
- TimerTask:定时任务
- TaskQueue:任务队列,队列中的任务按执行时间先后顺序排序,队首执行时间最靠前
2.1.2 关键源码解析
Timer的实现非常简单,翻看源代码我们来看下它的核心调度和执行处理过程:
privatevoidmainLoop() { while (true) { try { TimerTasktask; booleantaskFired; synchronized(queue) { // 同步锁住队列 ..... task=queue.getMin(); // 取出队首任务 synchronized(task.lock) { ..... if (taskFired= (executionTime<=currentTime)) { // 如果达到执行时间设置可执行标志if (task.period==0) { queue.removeMin(); // 移除队列task.state=TimerTask.EXECUTED; // 标记完成状态 } else { queue.rescheduleMin( task.period<0?currentTime-task.period : executionTime+task.period); } } } if (!taskFired) queue.wait(executionTime-currentTime); // 如果时间没达到,等待△time } if (taskFired) // 如果可执行则runtask.run(); } catch(InterruptedExceptione) { } } }
借助同步悲观锁机制,整个执行器采用单线程无限遍历队列来实现,不断取出队首任务,判断是否达到执行时间,如果达到则执行,如果没有达到则等待直到达到执行时间。执行过程可简化为如下流程:
2.1.3 优缺点
优点:简单易用。
缺陷:不支持多线程;对系统时钟敏感;当前任务异常会终止队列中后续任务执行;不支持定时表达式。
2.2 ScheduledExecutorService
2.2.1 简介
我们知道Java1.5是Java历史版本上的一个重大转折点,天才并发大师Doug Lee为Java带来了完整的线程池
编程框架J.U.C,结束了Java只能手动创建线程的历史,使得多线程编程更加简单、安全和高效。我们先来看下J.U.C包下线程池的类图:
其中有两个可直接使用的实例化类,ThreadPoolExecutor和ScheduledThreadPoolExecutor,前者是最基础的通用线程池,使用者可以通过灵活的构造函数传参创建所需要的线程池。后者就是我们要介绍的定时任务管理器。ScheduledExecutorService是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说任务是并发执行,互不影响。需要注意的是只有当调度任务来的时候,ScheduledExecutorService才会真正启动一个线程,其余时间ScheduledExecutorService都是出于轮询任务的状态。
ScheduledExecutorService定时任务框架体系也拥有定时任务的4个主要角色:
- ScheduledThreadPoolExecutor:调度器
- ThreadPoolExecutor:执行器
- ScheduledFutureTask:定时任务,记录执行时间和周期
- DelayedWorkQueue:任务队列
2.2.2 关键源码解析
在深入了解Java并发包下的定时任务调度之前,强烈建议先认真阅读2.1中Timer的核心思路,因为ScheduledExecutorService实现定时任务调度的最本质思路和Timer基本如出一辙,最大的改变就是将Timer的单线程变成了多线程。
我们先来看下其实现类ScheduledThreadPoolExecutor的创建过程:
publicScheduledThreadPoolExecutor(intcorePoolSize, ThreadFactorythreadFactory, RejectedExecutionHandlerhandler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, newDelayedWorkQueue(), threadFactory, handler); }
可以看出它直接调用了通用父类线程池ThreadPoolExecutor来实例化自己,也就说任务的执行是通过ThreadPoolExecutor来实现,另外值得关注的是任务队列它采用了延迟队列DelayedWorkQueue,DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类。我们来看下DelayedWorkQueue对定时任务是如何管理的:
// 添加定时任务publicbooleanoffer(Runnablex) { ... finalReentrantLocklock=this.lock; lock.lock(); try { inti=size; if (i>=queue.length) grow(); size=i+1; if (i==0) { // 队列为空,则直接加到队首queue[0] =e; setIndex(e, 0); } else { siftUp(i, e); // 队列不对空,调用siftUp方法决定x在队列中的顺序,shiftUp通过调用任务的compareTo来实现将定时执行时间最早的放在最前面 } if (queue[0] ==e) { leader=null; available.signal(); } } finally { lock.unlock(); } returntrue; }
上述的siftUp方法使得任务队列和Timer中的任务队列达到了同样的效果,定时执行时间最早的放在队列最前面,我们再来看下它是如何达成定时执行的。
在ScheduledThreadPoolExecutor中我们可以看到所有提交的定时任务最后都调用了ensurePrestart()方法,它是父类线程池的方法,调用了addWorker()方法,该方法是线程池对线程的核心管理方法,不过不是本文章的重点不细讲,ensurePrestart方法中调用addWorker方法传递的firstTask都是null,也就是说给线程池提交了一个空任务,那么线程池执行任务都需要从队列中拉取的任务,那我们来看下队列拿任务的take方法的实现:
publicRunnableScheduledFuture<?>take() throwsInterruptedException { finalReentrantLocklock=this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?>first=queue[0]; // 直接获取队首任务if (first==null) available.await(); else { longdelay=first.getDelay(NANOSECONDS); // 队首任务执行时间和当前时间的时间差if (delay<=0) returnfinishPoll(first); // 小于0则直接弹出执行first=null; if (leader!=null) available.await(); else { ThreadthisThread=Thread.currentThread(); leader=thisThread; try { available.awaitNanos(delay); // 否则等待delay时间 } finally { if (leader==thisThread) leader=null; } } } } } finally { if (leader==null&&queue[0] !=null) available.signal(); lock.unlock(); } }
到这里大家发现没有,任务定时执行的思路和Timer其实是一样的,通过判断队首的定时任务的执行时间是否达到,达到则弹出执行,否则等待阻塞。
2.2.3 优缺点
优点:支持多定时任务并发执行;支持延迟执行;当前任务异常不会终止队列中后续任务执行。
缺点:不支持定时表达式;不支持分布式。
3 第三方定时任务调度框架
定时任务处理方式的核心思路其实都差不多,而第2章节介绍的是Java对其最原始最底层的处理方式,因此通过剖析源码的方式较为详细的介绍。但是实际企业生产使用过程中,他们并没有那么方便,比如不支持定时表达式,不支持界面化操作,不支持自动告警,不支持定时时间实时修改等,从而产生了大量的企业级定时任务框架,比如经典的Quartz,开源的xxl-job,阿里巴巴的schduleX等,这些框架通常功能丰富,但是往往又很重,对于轻量级应用而言引用并不划算,感兴趣的同学可以查阅更多资料详细了解。下面将介绍一款Spring自实现的定时任务调度方案Spring Scheduler。
4 Spring定时任务调度器
4.1 简介
为了简化使用者对动态任务的调度,Spring自实现了一个轻量级任务调度管理器进行动态任务管理与调度,使用者只需要在配置类上加入@EnableSchedule注解即可开启对计划任务的支持,然后在要执行计划任务的方法上加上@Schedule即可。Spring通过接口TaskScheduler和TaskExecutor这两个接口的方式为异步定时任务提供了一种抽象,前者拥有任务调度能力,后者拥有任务执行能力。
4.2 关键源码解析
4.2.1 @EnableSchedule注解到底做了什么
查看这个注解的元注解可以看到@Configuration和@Import(SchedulingConfiguration.class),@Import是用来导入配置类的,查看SchedulingConfiguration.class发现它向Spring容器声明了一个Bean:ScheduledAnnotationBeanPostProceessor,看一下这个类的解释:
* <p>This post-processor is automatically registered by Spring's
* {@code <task:annotation-driven>} XML element, and also by the
* {@link EnableScheduling @EnableScheduling} annotation.
这个类在初始化时主要会做两件事情:
(1)初始化TaskScheduler
TaskScheduler是实际任务的调度器。ScheduledAnnotationBeanPostProceessor这个类实现了ApplicationContextAware接口,重写了onApplicationEvent方法,这个方法会调用finishRegistration()方法,finishRegistration()方法在最后通过一个register调用了afterPropertiesSet()方法,这里先不讨论register是做什么的,afterPropertiesSet()方法第一步就是判断调度器是否为空,显然如果我们只单纯的加了一个@EnableSchedule注解,调度器为空,那么这里就会创建调度器,看这里创建的思路:
if (this.taskScheduler==null) { this.localExecutor=Executors.newSingleThreadScheduledExecutor(); this.taskScheduler=newConcurrentTaskScheduler(this.localExecutor); }
注意了,这里首先创建的是一个单线程的任务执行器,然后把这个执行器传递给新建的调度器,这个时候该任务调度器拥有了单线程任务执行能力,这也是为什么如果你只是单纯的引入此注解,多任务在执行的时候会发生阻塞。
(2)寻找所有加了@Scheduled和@Schedules注解的方法
postProcessAfterInitialization()方法会在所有bean初始化完后了找到所有加了@Scheduled和@Schedules注解的方法,并解析定时表达式进行任务初始化准备工作。
4.2.2 如何支持任务的并发执行性
从上面的分析中可以看出,@EnableSchedule注解默认对任务的执行采用的是单线程的方式,即所有任务都必须等待当前任务执行完成后才可以继续调度执行,在多任务系统中默认实现方式显然不能满足要求。如果需要支持多任务的并发执行,Spring也为我们提供了实现方式,实现SchedulingConfigurer接口,为调度器TaskScheduler创建自定义的任务执行器,创建方式如下:
publicclassTaskScheduleConfigimplementsSchedulingConfigurer { privateThreadPoolTaskSchedulertaskScheduler; publicvoidconfigureTasks(ScheduledTaskRegistrartaskRegistrar) { taskRegistrar.setScheduler(taskScheduler()); } destroyMethod="shutdown") (publicThreadPoolTaskSchedulertaskScheduler(){ //创建一个线程池调度器taskScheduler=newThreadPoolTaskScheduler(); //设置线程池容量taskScheduler.setPoolSize(2); //线程名前缀taskScheduler.setThreadNamePrefix("task-"); //等待时常taskScheduler.setAwaitTerminationSeconds(60); //当调度器shutdown被调用时等待当前被调度的任务完成taskScheduler.setWaitForTasksToCompleteOnShutdown(true); //设置当任务被取消的同时从当前调度器移除的策略taskScheduler.setRemoveOnCancelPolicy(true); //设置任务注册器的调度器returntaskScheduler; } }
4.3,Spring如何实现任务异步化
Spring的定时任务执行都是同步的,有时候我们会碰到单任务执行时间很长的问题,需要将任务的执行异步化,可以将@Async注解和@Scheduled注解联合起来使用。但是这里会有一个问题,当任务被异步化的时候他会直接告诉任务调度器当前任务已经执行完了,因此下一次任务会根据定时时间准点执行,有可能实际上上一次任务并没有执行完。在某些场景下会导致数据异常,比如:
fixedRate=10*1000L) (publicvoidtaskMonitor() { List<Task>taskList=taskBO.findUnfinishedTaskList(); for (Tasktask : taskList) { executeInLock("monitor_"+task.getId(), () -> { List<Subtask>subtaskList=taskBO.findUnfinishedSubtaskListByTaskId(task.getId()); if (subtaskList.isEmpty()) { taskBO.completeTask(task); } else { //未完成的subtask,true表示任务执行失败,记录错误log、发通知if (checkUnfinishSubtaskList(subtaskList)) { logger.error("task execute fail:"+JSON.toJSONString(task)); taskBO.failTask(task, subtaskList); sendDingtalk("任务执行超时,请关注:"+JSON.toJSONString(task)); } } returntrue; }, 8L, false); } }
如果taskMonitor第一次没有执行完,针对某个taskA,正在执行completeTask方法,completeTask里面有insert DB和更新taskA状态的操作,但是数据还没有插入到DB,然后锁超时释放,这个时候taskMonitor又开始调度执行了,而taskA又被查询出来,再次执行completeTask操作,这样就会有重复数据插入到DB,因此尽量避免两个注解同时使用。如果确实需要将定时任务异步化,可以把锁的超时释放时间设置长一些,保证任务执行完在释放。