高并发之——深度解析ScheduledFutureTask类源码

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: JDK 1.5提供的ScheduledThreadPoolExecutor执行定时任务时,会将任务封装成ScheduledFutureTask对象。那么,ScheduledFutureTask对象有何特殊之处?今天,我们就一起来手撕ScheduledFutureTask类的源码,来深入理解ScheduledFutureTask类的细节。

JDK 1.5提供的ScheduledThreadPoolExecutor执行定时任务时,会将任务封装成ScheduledFutureTask对象。那么,ScheduledFutureTask对象有何特殊之处?今天,我们就一起来手撕ScheduledFutureTask类的源码,来深入理解ScheduledFutureTask类的细节。

类的层级关系

从ScheduledFutureTask类的定义可以看出,ScheduledFutureTask类是ScheduledThreadPoolExecutor类的私有内部类,继承了FutureTask类,并实现了RunnableScheduledFuture接口。也就是说,ScheduledFutureTask具有FutureTask类的所有功能,并实现了RunnableScheduledFuture接口的所有方法。ScheduledFutureTask类的定义如下所示。

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>

我们也可以从ScheduledFutureTask类的层级关系图,更加清晰的看出ScheduledFutureTask类继承了哪些类和实现了哪些接口,有关ScheduledFutureTask类的层级关系图如下所示。

微信图片_20211118120243.jpg

接下来,我们就拆解ScheduledFutureTask类的源码,探究其细节信息。我们还是带着一些问题来解读ScheduledFutureTask类的源码,例如为何将任务封装成ScheduledFutureTask对象?在定时任务中,如何区分不同的 ScheduledFutureTask任务?

重要的成员变量

ScheduledFutureTask类中提供了几个成员变量,如下所示。

//任务添加到ScheduledThreadPoolExecutor中被分配的唯一序列号
private final long sequenceNumber;
//下次执行任务的时间
private long time;
//任务执行的周期
private final long period;
//ScheduledFutureTask对象,实际指向当前对象本身
RunnableScheduledFuture<V> outerTask = this;
//当前任务在延迟队列中的索引
//能够更加方便的取消当前任务
int heapIndex;

我们来看几个重要的属性。

  • sequenceNumber:任务添加到ScheduledThreadPoolExecutor中被分配的唯一序列号,可以根据这个序列号确定唯一的一个任务,如果在定时任务中,如果一个任务是周期性执行的,但是它们的sequenceNumber的值相同,则被视为是同一个任务。
  • time:下一次执行任务的时间。
  • period:任务的执行周期。
  • outerTask:ScheduledFutureTask对象,实际指向当前对象本身。此对象的引用会被传入到周期性执行任务的ScheduledThreadPoolExecutor类的reExecutePeriodic方法中。
  • heapIndex:当前任务在延迟队列中的索引,这个索引能够更加方便的取消当前任务。

构造方法

ScheduledFutureTask类继承了FutureTask类,并实现了RunnableScheduledFuture接口。在ScheduledFutureTask类中提供了如下构造方法。

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

通过源码可以看到,在ScheduledFutureTask类的构造方法中,首先会调用FutureTask类的构造方法为FutureTask类的callable和state成员变量赋值,接下来为ScheduledFutureTask类的time、period和sequenceNumber成员变量赋值。理解起来比较简单。

getDelay方法

我们先来看getDelay方法的源码,如下所示。

//获取下次执行任务的时间距离当前时间的纳秒数
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

getDelay方法比较简单,主要用来获取下次执行任务的时间距离当前系统时间的纳秒数。

compareTo方法

ScheduledFutureTask类在类的结构上实现了Comparable接口,compareTo方法主要是对Comparable接口定义的compareTo方法的实现。源码如下所示。

public int compareTo(Delayed other) {
    if (other == this) 
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

这段代码看上去好像是对各种数值类型数据的比较,本质上是对延迟队列中的任务进行排序。排序规则为:首先比较延迟队列中每个任务下次执行的时间,下次执行时间距离当前时间短的任务会排在前面;如果下次执行任务的时间相同,则会比较任务的sequenceNumber值,sequenceNumber值小的任务会排在前面。

isPeriodic方法

isPeriodic方法的源代码如下所示。

//判断是否是周期性任务
public boolean isPeriodic() {
    return period != 0;
}

这个方法的作用比较简单,主要是用来判断当前任务是否是周期性任务。这里只要判断运行任务的执行周期不等于0就能确定为周期性任务了。因为无论period的值是大于0还是小于0,当前任务都是周期性任务。

setNextRunTime方法

setNextRunTime方法的作用主要是设置当前任务下次执行的时间,源码如下所示。

private void setNextRunTime() {
    long p = period;
    //固定频率,上次执行任务的时间加上任务的执行周期
    if (p > 0)
        time += p;
    //相对固定的延迟执行,当前系统时间加上任务的执行周期
    else
        time = triggerTime(-p);
}

这里,再一次证明了使用isPeriodic方法判断当前任务是否为周期性任务时,只要判断period的值是否不等于0就可以了。因为如果当前任务是固定频率执行的周期性任务,会将周期period当作正数来处理;如果是相对固定的延迟执行当前任务,则会将周期period当作负数来处理。

这里,我们看到在setNextRunTime方法中,调用了ScheduledThreadPoolExecutor类的triggerTime方法。接下来,我们看下triggerTime方法的源码。

ScheduledThreadPoolExecutor类的triggerTime方法

triggerTime方法用于获取延迟队列中的任务下一次执行的具体时间。源码如下所示。

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

这两个triggerTime方法的代码比较简单,就是获取下一次执行任务的具体时间。有一点需要注意的是:delay < (Long.MAX_VALUE >> 1判断delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,则直接返回delay,否则需要处理溢出的情况。

我们看到在triggerTime方法中处理防止溢出的逻辑使用了ScheduledThreadPoolExecutor类的overflowFree方法,接下来,我们就看看ScheduledThreadPoolExecutor类的overflowFree方法的实现。

ScheduledThreadPoolExecutor类的overflowFree方法

overflowFree方法的源代码如下所示。

private long overflowFree(long delay) {
    //获取队列中的节点
    Delayed head = (Delayed) super.getQueue().peek();
    //获取的节点不为空,则进行后续处理
    if (head != null) {
        //从队列节点中获取延迟时间
        long headDelay = head.getDelay(NANOSECONDS);
        //如果从队列中获取的延迟时间小于0,并且传递的delay
        //值减去从队列节点中获取延迟时间小于0
        if (headDelay < 0 && (delay - headDelay < 0))
            //将delay的值设置为Long.MAX_VALUE + headDelay
            delay = Long.MAX_VALUE + headDelay;
    }
    //返回延迟时间
    return delay;
}

通过对overflowFree方法的源码分析,可以看出overflowFree方法本质上就是为了限制队列中的所有节点的延迟时间在Long.MAX_VALUE值之内,防止在compareTo方法中溢出。

cancel方法

cancel方法的作用主要是取消当前任务的执行,源码如下所示。

public boolean cancel(boolean mayInterruptIfRunning) {
    //取消任务,返回任务是否取消的标识
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    //如果任务已经取消
    //并且需要将任务从延迟队列中删除
    //并且任务在延迟队列中的索引大于或者等于0
    if (cancelled && removeOnCancel && heapIndex >= 0)
        //将当前任务从延迟队列中删除
        remove(this);
    //返回是否成功取消任务的标识
    return cancelled;
}

这段代码理解起来相对比较简单,首先调用取消任务的方法,并返回任务是否已经取消的标识。如果任务已经取消,并且需要移除任务,同时,任务在延迟队列中的索引大于或者等于0,则将当前任务从延迟队列中移除。最后返回任务是否成功取消的标识。

run方法

run方法可以说是ScheduledFutureTask类的核心方法,是对Runnable接口的实现,源码如下所示。

public void run() {
    //当前任务是否是周期性任务
    boolean periodic = isPeriodic();
    //线程池当前运行状态下不能执行周期性任务
    if (!canRunInCurrentRunState(periodic))
        //取消任务的执行
        cancel(false);
    //如果不是周期性任务
    else if (!periodic)
        //则直接调用FutureTask类的run方法执行任务
        ScheduledFutureTask.super.run();
    //如果是周期性任务,则调用FutureTask类的runAndReset方法执行任务
    //如果任务执行成功
    else if (ScheduledFutureTask.super.runAndReset()) {
        //设置下次执行任务的时间
        setNextRunTime();
        //重复执行任务
        reExecutePeriodic(outerTask);
    }
}

整个方法的逻辑就是:首先判断当前任务是否是周期性任务。如果线程池当前运行状态下不能执行周期性任务,则取消任务的执行。如果当前任务不是周期性任务,则直接调用FutureTask类的run方法执行任务;如果当前任务是周期性任务,则调用FutureTask类的runAndReset方法执行任务,并且如果任务执行成功,则设置下次执行任务的时间,同时,将任务设置为重复执行。

这里,调用了FutureTask类的run方法和runAndReset方法,并且调用了ScheduledThreadPoolExecutor类的reExecutePeriodic方法。接下来,我们分别看下这些方法的实现。

FutureTask类的run方法

FutureTask类的run方法源码如下所示。

public void run() {
    //状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
    //状态如果是NEW,则尝试把当前执行线程保存在runner字段中
    //如果赋值失败则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //任务异常
                setException(ex);
            }
            if (ran)
                //任务正常执行完毕
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        //如果任务被中断,执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

代码的整体逻辑为:判断当前任务的state是否等于NEW,如果不为NEW则说明任务或者已经执行过,或者已经被取消,直接返回;如果状态为NEW则接着会通过unsafe类把任务执行线程引用CAS的保存在runner字段中,如果保存失败,则直接返回;执行任务;如果任务执行发生异常,则调用setException()方法保存异常信息。

FutureTask类的runAndReset方法

方法的源码如下所示。

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

FutureTask类的runAndReset方法与run方法的逻辑基本相同,只是runAndReset方法会重置当前任务的执行状态。

ScheduledThreadPoolExecutor类的reExecutePeriodic方法

ScheduledThreadPoolExecutor类的reExecutePeriodic方法的源代码如下所示。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    //线程池当前状态下能够执行任务
    if (canRunInCurrentRunState(true)) {
        //将任务放入队列
        super.getQueue().add(task);
        //线程池当前状态下不能执行任务,并且成功移除任务
        if (!canRunInCurrentRunState(true) && remove(task))
            //取消任务
            task.cancel(false);
        else
            //调用ThreadPoolExecutor类的ensurePrestart()方法
            ensurePrestart();
    }
}

总体来说reExecutePeriodic方法的逻辑比较简单,需要注意的是:调用reExecutePeriodic方法的时候已经执行过一次任务,所以,并不会触发线程池的拒绝策略;传入reExecutePeriodic方法的任务一定是周期性的任务。

相关文章
|
20天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
1月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
53 6
|
5天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
5天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
|
9天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
19天前
|
缓存 Java 开发者
Spring高手之路22——AOP切面类的封装与解析
本篇文章深入解析了Spring AOP的工作机制,包括Advisor和TargetSource的构建与作用。通过详尽的源码分析和实际案例,帮助开发者全面理解AOP的核心技术,提升在实际项目中的应用能力。
17 0
Spring高手之路22——AOP切面类的封装与解析
|
27天前
|
测试技术 Python
python自动化测试中装饰器@ddt与@data源码深入解析
综上所述,使用 `@ddt`和 `@data`可以大大简化写作测试用例的过程,让我们能专注于测试逻辑的本身,而无需编写重复的测试方法。通过讲解了 `@ddt`和 `@data`源码的关键部分,我们可以更深入地理解其背后的工作原理。
23 1
|
1月前
|
JSON 图形学 数据格式
Json☀️ 一、认识Json是如何解析成类的
Json☀️ 一、认识Json是如何解析成类的
|
1月前
|
开发者 Python
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
67 1
|
1月前
|
开发者 Python
深入解析Python `requests`库源码,揭开HTTP请求的神秘面纱!
深入解析Python `requests`库源码,揭开HTTP请求的神秘面纱!
121 1

推荐镜像

更多