Java定时任务调度原理解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 随着互联网应用的快速普及,开发者们往往会遇到业务逻辑复杂、时间驱动类型业务、数据处理、离线分析等场景,比如整点发送优惠券、按月批量统计报表等,为了减少对核心系统的影响,我们通常会采用定时任务框架来处理。定时任务顾名思义就是预先设定任务执行时间,到点后任务自动被调度执行,下面列出几种常见的定时任务框架并简单介绍其实现原理。

1,定时任务管理简介

随着互联网应用的快速普及,开发者们往往会遇到业务逻辑复杂、时间驱动类型业务、数据处理、离线分析等场景,比如整点发送优惠券、按月批量统计报表等,为了减少对核心系统的影响,我们通常会采用定时任务框架来处理。定时任务顾名思义就是预先设定任务执行时间,到点后任务自动被调度执行,下面列出几种常见的定时任务框架并简单介绍其实现原理。

2 Java 原生定时任务调度器

2.1 Timer

2.1.1 简介

Timer是从Java SDK1.3开始提供的最原生定时任务执行解决方案,位于java.util包下,主要包括如下四类角色:

  • Timer:任务调度器
  • TimerThread:任务执行器
  • TimerTask:定时任务
  • TaskQueue:任务队列,队列中的任务按执行时间先后顺序排序,队首执行时间最靠前

2.1.2 关键源码解析

Timer的实现非常简单,翻看源代码我们来看下它的核心调度和执行处理过程:

privatevoidmainLoop() {
while (true) {
try {
TimerTasktask;
booleantaskFired;
synchronized(queue) {  // 同步锁住队列                 .....
task=queue.getMin();  // 取出队首任务 synchronized(task.lock)  { 
                    .....
if (taskFired= (executionTime<=currentTime)) {  // 如果达到执行时间设置可执行标志if (task.period==0) { 
queue.removeMin();   // 移除队列task.state=TimerTask.EXECUTED;   // 标记完成状态                        } else {
queue.rescheduleMin(
task.period<0?currentTime-task.period : executionTime+task.period);
                        }
                    }
                }
if (!taskFired) queue.wait(executionTime-currentTime); // 如果时间没达到,等待△time            }
if (taskFired)  // 如果可执行则runtask.run();
        } catch(InterruptedExceptione) {
        }
    }
}


借助同步悲观锁机制,整个执行器采用单线程无限遍历队列来实现,不断取出队首任务,判断是否达到执行时间,如果达到则执行,如果没有达到则等待直到达到执行时间。执行过程可简化为如下流程:

2.1.3 优缺点

优点:简单易用。

缺陷:不支持多线程;对系统时钟敏感;当前任务异常会终止队列中后续任务执行;不支持定时表达式。

2.2 ScheduledExecutorService

2.2.1 简介

我们知道Java1.5是Java历史版本上的一个重大转折点,天才并发大师Doug Lee为Java带来了完整的线程池

编程框架J.U.C,结束了Java只能手动创建线程的历史,使得多线程编程更加简单、安全和高效。我们先来看下J.U.C包下线程池的类图:

其中有两个可直接使用的实例化类,ThreadPoolExecutor和ScheduledThreadPoolExecutor,前者是最基础的通用线程池,使用者可以通过灵活的构造函数传参创建所需要的线程池。后者就是我们要介绍的定时任务管理器。ScheduledExecutorService是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说任务是并发执行,互不影响。需要注意的是只有当调度任务来的时候,ScheduledExecutorService才会真正启动一个线程,其余时间ScheduledExecutorService都是出于轮询任务的状态。

ScheduledExecutorService定时任务框架体系也拥有定时任务的4个主要角色:

  • ScheduledThreadPoolExecutor:调度器
  • ThreadPoolExecutor:执行器
  • ScheduledFutureTask:定时任务,记录执行时间和周期
  • DelayedWorkQueue:任务队列

2.2.2 关键源码解析

在深入了解Java并发包下的定时任务调度之前,强烈建议先认真阅读2.1中Timer的核心思路,因为ScheduledExecutorService实现定时任务调度的最本质思路和Timer基本如出一辙,最大的改变就是将Timer的单线程变成了多线程。

我们先来看下其实现类ScheduledThreadPoolExecutor的创建过程:

publicScheduledThreadPoolExecutor(intcorePoolSize,
ThreadFactorythreadFactory,
RejectedExecutionHandlerhandler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
newDelayedWorkQueue(), threadFactory, handler);
}

可以看出它直接调用了通用父类线程池ThreadPoolExecutor来实例化自己,也就说任务的执行是通过ThreadPoolExecutor来实现,另外值得关注的是任务队列它采用了延迟队列DelayedWorkQueue,DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类。我们来看下DelayedWorkQueue对定时任务是如何管理的:

// 添加定时任务publicbooleanoffer(Runnablex) {
    ...
finalReentrantLocklock=this.lock;
lock.lock();
try {
inti=size;
if (i>=queue.length) grow();
size=i+1;
if (i==0) {  // 队列为空,则直接加到队首queue[0] =e;
setIndex(e, 0);
        } else {
siftUp(i, e);   // 队列不对空,调用siftUp方法决定x在队列中的顺序,shiftUp通过调用任务的compareTo来实现将定时执行时间最早的放在最前面        }
if (queue[0] ==e) {
leader=null;
available.signal();
        }
    } finally {
lock.unlock();
    }
returntrue;
}

上述的siftUp方法使得任务队列和Timer中的任务队列达到了同样的效果,定时执行时间最早的放在队列最前面,我们再来看下它是如何达成定时执行的。

在ScheduledThreadPoolExecutor中我们可以看到所有提交的定时任务最后都调用了ensurePrestart()方法,它是父类线程池的方法,调用了addWorker()方法,该方法是线程池对线程的核心管理方法,不过不是本文章的重点不细讲,ensurePrestart方法中调用addWorker方法传递的firstTask都是null,也就是说给线程池提交了一个空任务,那么线程池执行任务都需要从队列中拉取的任务,那我们来看下队列拿任务的take方法的实现:

publicRunnableScheduledFuture<?>take() throwsInterruptedException {
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?>first=queue[0]; // 直接获取队首任务if (first==null)
available.await();
else {
longdelay=first.getDelay(NANOSECONDS);  // 队首任务执行时间和当前时间的时间差if (delay<=0)
returnfinishPoll(first); // 小于0则直接弹出执行first=null; 
if (leader!=null)
available.await();
else {
ThreadthisThread=Thread.currentThread();
leader=thisThread;
try {
available.awaitNanos(delay);  // 否则等待delay时间                    } finally {
if (leader==thisThread)
leader=null;
                    }
                }
            }
        }
    } finally {
if (leader==null&&queue[0] !=null)
available.signal();
lock.unlock();
    }
}

到这里大家发现没有,任务定时执行的思路和Timer其实是一样的,通过判断队首的定时任务的执行时间是否达到,达到则弹出执行,否则等待阻塞。

2.2.3 优缺点

优点:支持多定时任务并发执行;支持延迟执行;当前任务异常不会终止队列中后续任务执行。

缺点:不支持定时表达式;不支持分布式。

3 第三方定时任务调度框架

定时任务处理方式的核心思路其实都差不多,而第2章节介绍的是Java对其最原始最底层的处理方式,因此通过剖析源码的方式较为详细的介绍。但是实际企业生产使用过程中,他们并没有那么方便,比如不支持定时表达式,不支持界面化操作,不支持自动告警,不支持定时时间实时修改等,从而产生了大量的企业级定时任务框架,比如经典的Quartz,开源的xxl-job,阿里巴巴的schduleX等,这些框架通常功能丰富,但是往往又很重,对于轻量级应用而言引用并不划算,感兴趣的同学可以查阅更多资料详细了解。下面将介绍一款Spring自实现的定时任务调度方案Spring Scheduler。

4 Spring定时任务调度器

4.1 简介

为了简化使用者对动态任务的调度,Spring自实现了一个轻量级任务调度管理器进行动态任务管理与调度,使用者只需要在配置类上加入@EnableSchedule注解即可开启对计划任务的支持,然后在要执行计划任务的方法上加上@Schedule即可。Spring通过接口TaskScheduler和TaskExecutor这两个接口的方式为异步定时任务提供了一种抽象,前者拥有任务调度能力,后者拥有任务执行能力。

4.2 关键源码解析

4.2.1 @EnableSchedule注解到底做了什么

查看这个注解的元注解可以看到@Configuration和@Import(SchedulingConfiguration.class),@Import是用来导入配置类的,查看SchedulingConfiguration.class发现它向Spring容器声明了一个Bean:ScheduledAnnotationBeanPostProceessor,看一下这个类的解释:

* <p>This post-processor is automatically registered by Spring's

* {@code <task:annotation-driven>} XML element, and also by the

* {@link EnableScheduling @EnableScheduling} annotation.

这个类在初始化时主要会做两件事情:

(1)初始化TaskScheduler

TaskScheduler是实际任务的调度器。ScheduledAnnotationBeanPostProceessor这个类实现了ApplicationContextAware接口,重写了onApplicationEvent方法,这个方法会调用finishRegistration()方法,finishRegistration()方法在最后通过一个register调用了afterPropertiesSet()方法,这里先不讨论register是做什么的,afterPropertiesSet()方法第一步就是判断调度器是否为空,显然如果我们只单纯的加了一个@EnableSchedule注解,调度器为空,那么这里就会创建调度器,看这里创建的思路:

if (this.taskScheduler==null) {
this.localExecutor=Executors.newSingleThreadScheduledExecutor();
this.taskScheduler=newConcurrentTaskScheduler(this.localExecutor);
}

注意了,这里首先创建的是一个单线程的任务执行器,然后把这个执行器传递给新建的调度器,这个时候该任务调度器拥有了单线程任务执行能力,这也是为什么如果你只是单纯的引入此注解,多任务在执行的时候会发生阻塞。

(2)寻找所有加了@Scheduled和@Schedules注解的方法

postProcessAfterInitialization()方法会在所有bean初始化完后了找到所有加了@Scheduled和@Schedules注解的方法,并解析定时表达式进行任务初始化准备工作。

4.2.2 如何支持任务的并发执行性

从上面的分析中可以看出,@EnableSchedule注解默认对任务的执行采用的是单线程的方式,即所有任务都必须等待当前任务执行完成后才可以继续调度执行,在多任务系统中默认实现方式显然不能满足要求。如果需要支持多任务的并发执行,Spring也为我们提供了实现方式,实现SchedulingConfigurer接口,为调度器TaskScheduler创建自定义的任务执行器,创建方式如下:

@Configuration@EnableSchedulingpublicclassTaskScheduleConfigimplementsSchedulingConfigurer {
privateThreadPoolTaskSchedulertaskScheduler;
@OverridepublicvoidconfigureTasks(ScheduledTaskRegistrartaskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
    }
@Bean(destroyMethod="shutdown")
publicThreadPoolTaskSchedulertaskScheduler(){
//创建一个线程池调度器taskScheduler=newThreadPoolTaskScheduler();
//设置线程池容量taskScheduler.setPoolSize(2);
//线程名前缀taskScheduler.setThreadNamePrefix("task-");
//等待时常taskScheduler.setAwaitTerminationSeconds(60);
//当调度器shutdown被调用时等待当前被调度的任务完成taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
//设置当任务被取消的同时从当前调度器移除的策略taskScheduler.setRemoveOnCancelPolicy(true);
//设置任务注册器的调度器returntaskScheduler;
    }
}

4.3,Spring如何实现任务异步化

Spring的定时任务执行都是同步的,有时候我们会碰到单任务执行时间很长的问题,需要将任务的执行异步化,可以将@Async注解和@Scheduled注解联合起来使用。但是这里会有一个问题,当任务被异步化的时候他会直接告诉任务调度器当前任务已经执行完了,因此下一次任务会根据定时时间准点执行,有可能实际上上一次任务并没有执行完。在某些场景下会导致数据异常,比如:

@Async@Scheduled(fixedRate=10*1000L)
publicvoidtaskMonitor() {
List<Task>taskList=taskBO.findUnfinishedTaskList();
for (Tasktask : taskList) {
executeInLock("monitor_"+task.getId(), () -> {
List<Subtask>subtaskList=taskBO.findUnfinishedSubtaskListByTaskId(task.getId());
if (subtaskList.isEmpty()) {
taskBO.completeTask(task);
            } else {
//未完成的subtask,true表示任务执行失败,记录错误log、发通知if (checkUnfinishSubtaskList(subtaskList)) {
logger.error("task execute fail:"+JSON.toJSONString(task));
taskBO.failTask(task, subtaskList);
sendDingtalk("任务执行超时,请关注:"+JSON.toJSONString(task));
                }
            }
returntrue;
        }, 8L, false);
    }
}

如果taskMonitor第一次没有执行完,针对某个taskA,正在执行completeTask方法,completeTask里面有insert DB和更新taskA状态的操作,但是数据还没有插入到DB,然后锁超时释放,这个时候taskMonitor又开始调度执行了,而taskA又被查询出来,再次执行completeTask操作,这样就会有重复数据插入到DB,因此尽量避免两个注解同时使用。如果确实需要将定时任务异步化,可以把锁的超时释放时间设置长一些,保证任务执行完在释放。

相关文章
|
3天前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
31 13
|
22天前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
56 1
|
20天前
|
Java 编译器
Java 泛型详细解析
本文将带你详细解析 Java 泛型,了解泛型的原理、常见的使用方法以及泛型的局限性,让你对泛型有更深入的了解。
30 2
Java 泛型详细解析
|
20天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
50 12
|
18天前
|
存储 算法 Java
Java内存管理深度解析####
本文深入探讨了Java虚拟机(JVM)中的内存分配与垃圾回收机制,揭示了其高效管理内存的奥秘。文章首先概述了JVM内存模型,随后详细阐述了堆、栈、方法区等关键区域的作用及管理策略。在垃圾回收部分,重点介绍了标记-清除、复制算法、标记-整理等多种回收算法的工作原理及其适用场景,并通过实际案例分析了不同GC策略对应用性能的影响。对于开发者而言,理解这些原理有助于编写出更加高效、稳定的Java应用程序。 ####
|
18天前
|
存储 监控 算法
Java虚拟机(JVM)垃圾回收机制深度解析与优化策略####
本文旨在深入探讨Java虚拟机(JVM)的垃圾回收机制,揭示其工作原理、常见算法及参数调优方法。通过剖析垃圾回收的生命周期、内存区域划分以及GC日志分析,为开发者提供一套实用的JVM垃圾回收优化指南,助力提升Java应用的性能与稳定性。 ####
|
20天前
|
Java 数据库连接 开发者
Java中的异常处理机制:深入解析与最佳实践####
本文旨在为Java开发者提供一份关于异常处理机制的全面指南,从基础概念到高级技巧,涵盖try-catch结构、自定义异常、异常链分析以及最佳实践策略。不同于传统的摘要概述,本文将以一个实际项目案例为线索,逐步揭示如何高效地管理运行时错误,提升代码的健壮性和可维护性。通过对比常见误区与优化方案,读者将获得编写更加健壮Java应用程序的实用知识。 --- ####
|
22天前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
22天前
|
存储 供应链 算法
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
47 0
|
Java API 调度
Java调度框架Quartz 2.2.1
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/18792129 Java调度框架Quartz 2.2.1 Java调度框架Quartz 2.2.1版在前不久发布了,Quartz可以用来创建简单或为运行十个,百个,甚至是好几万个Jobs这样复杂的日程序表。
868 0

推荐镜像

更多
下一篇
DataWorks