【高并发】深度解析ScheduledThreadPoolExecutor类的源代码

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 在【高并发专题】的专栏中,我们深度分析了ThreadPoolExecutor类的源代码,而ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类。今天我们就来一起手撕ScheduledThreadPoolExecutor类的源代码。

大家好,我是冰河~~

在【高并发专题】的专栏中,我们深度分析了ThreadPoolExecutor类的源代码,而ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类。今天我们就来一起手撕ScheduledThreadPoolExecutor类的源代码。

构造方法

我们先来看下ScheduledThreadPoolExecutor的构造方法,源代码如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

从代码结构上来看,ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类,ScheduledThreadPoolExecutor类的构造方法实际上调用的是ThreadPoolExecutor类的构造方法。

schedule方法

接下来,我们看一下ScheduledThreadPoolExecutor类的schedule方法,源代码如下所示。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    //如果传递的Runnable对象和TimeUnit时间单位为空
    //抛出空指针异常
    if (command == null || unit == null)
        throw new NullPointerException();
    //封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象
    RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    //执行延时任务
    delayedExecute(t);
    //返回任务
    return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
    //如果传递的Callable对象和TimeUnit时间单位为空
    //抛出空指针异常
    if (callable == null || unit == null)
        throw new NullPointerException();
    //封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
    //执行延时任务
    delayedExecute(t);
    //返回任务
    return t;
}

从源代码可以看出,ScheduledThreadPoolExecutor类提供了两个重载的schedule方法,两个schedule方法的第一个参数不同。可以传递Runnable接口对象,也可以传递Callable接口对象。在方法内部,会将Runnable接口对象和Callable接口对象封装成RunnableScheduledFuture对象,本质上就是封装成ScheduledFutureTask对象。并通过delayedExecute方法来执行延时任务。

在源代码中,我们看到两个schedule都调用了decorateTask方法,接下来,我们就看看decorateTask方法。

decorateTask方法

decorateTask方法源代码如下所示。

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
    return task;
}

通过源码可以看出decorateTask方法的实现比较简单,接收一个Runnable接口对象或者Callable接口对象和封装的RunnableScheduledFuture任务,两个方法都是将RunnableScheduledFuture任务直接返回。在ScheduledThreadPoolExecutor类的子类中可以重写这两个方法。

接下来,我们继续看下scheduleAtFixedRate方法。

scheduleAtFixedRate方法

scheduleAtFixedRate方法源代码如下所示。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    //传入的Runnable对象和TimeUnit为空,则抛出空指针异常
    if (command == null || unit == null)
        throw new NullPointerException();
    //如果执行周期period传入的数值小于或者等于0
    //抛出非法参数异常
    if (period <= 0)
        throw new IllegalArgumentException();
    //将Runnable对象封装成ScheduledFutureTask任务,
    //并设置执行周期
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
    //调用decorateTask方法,本质上还是直接返回ScheduledFutureTask对象
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    //设置执行的任务
    sft.outerTask = t;
    //执行延时任务
    delayedExecute(t);
    //返回执行的任务
    return t;
}

通过源码可以看出,scheduleAtFixedRate方法将传递的Runnable对象封装成ScheduledFutureTask任务对象,并设置了执行周期,下一次的执行时间相对于上一次的执行时间来说,加上了period时长,时长的具体单位由TimeUnit决定。采用固定的频率来执行定时任务。

ScheduledThreadPoolExecutor类中另一个定时调度任务的方法是scheduleWithFixedDelay方法,接下来,我们就一起看看scheduleWithFixedDelay方法。

scheduleWithFixedDelay方法

scheduleWithFixedDelay方法的源代码如下所示。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    //传入的Runnable对象和TimeUnit为空,则抛出空指针异常
    if (command == null || unit == null)
        throw new NullPointerException();
    //任务延时时长小于或者等于0,则抛出非法参数异常
    if (delay <= 0)
        throw new IllegalArgumentException();
    //将Runnable对象封装成ScheduledFutureTask任务
    //并设置固定的执行周期来执行任务
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay));
    //调用decorateTask方法,本质上直接返回ScheduledFutureTask任务
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    //设置执行的任务
    sft.outerTask = t;
    //执行延时任务
    delayedExecute(t);
    //返回任务
    return t;
}

从scheduleWithFixedDelay方法的源代码,我们可以看出在将Runnable对象封装成ScheduledFutureTask时,设置了执行周期,但是此时设置的执行周期与scheduleAtFixedRate方法设置的执行周期不同。此时设置的执行周期规则为:下一次任务执行的时间是上一次任务完成的时间加上delay时长,时长单位由TimeUnit决定。也就是说,具体的执行时间不是固定的,但是执行的周期是固定的,整体采用的是相对固定的延迟来执行定时任务。

如果大家细心的话,会发现在scheduleWithFixedDelay方法中设置执行周期时,传递的delay值为负数,如下所示。

ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));

这里的负数表示的是相对固定的延迟。

在ScheduledFutureTask类中,存在一个setNextRunTime方法,这个方法会在run方法执行完任务后调用,这个方法更能体现scheduleAtFixedRate方法和scheduleWithFixedDelay方法的不同,setNextRunTime方法的源码如下所示。

private void setNextRunTime() {
    //距离下次执行任务的时长
    long p = period;
    //固定频率执行,
    //上次执行任务的时间
    //加上任务的执行周期
    if (p > 0)
        time += p;
    //相对固定的延迟
    //使用的是系统当前时间
    //加上任务的执行周期
    else
        time = triggerTime(-p);
}

在setNextRunTime方法中通过对下次执行任务的时长进行判断来确定是固定频率执行还是相对固定的延迟。

triggerTime方法

在ScheduledThreadPoolExecutor类中提供了两个triggerTime方法,用于获取下一次执行任务的具体时间。triggerTime方法的源码如下所示。

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

这两个triggerTime方法的代码比较简单,就是获取下一次执行任务的具体时间。有一点需要注意的是:delay < (Long.MAX_VALUE >> 1判断delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,则直接返回delay,否则需要处理溢出的情况。

我们看到在triggerTime方法中处理防止溢出的逻辑使用了overflowFree方法,接下来,我们就看看overflowFree方法的实现。

overflowFree方法

overflowFree方法的源代码如下所示。

private long overflowFree(long delay) {
    //获取队列中的节点
    Delayed head = (Delayed) super.getQueue().peek();
    //获取的节点不为空,则进行后续处理
    if (head != null) {
        //从队列节点中获取延迟时间
        long headDelay = head.getDelay(NANOSECONDS);
        //如果从队列中获取的延迟时间小于0,并且传递的delay
        //值减去从队列节点中获取延迟时间小于0
        if (headDelay < 0 && (delay - headDelay < 0))
            //将delay的值设置为Long.MAX_VALUE + headDelay
            delay = Long.MAX_VALUE + headDelay;
    }
    //返回延迟时间
    return delay;
}

通过对overflowFree方法的源码分析,可以看出overflowFree方法本质上就是为了限制队列中的所有节点的延迟时间在Long.MAX_VALUE值之内,防止在ScheduledFutureTask类中的compareTo方法中溢出。

ScheduledFutureTask类中的compareTo方法的源码如下所示。

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

compareTo方法的主要作用就是对各延迟任务进行排序,距离下次执行时间靠前的任务就排在前面。

delayedExecute方法

delayedExecute方法是ScheduledThreadPoolExecutor类中延迟执行任务的方法,源代码如下所示。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果当前线程池已经关闭
    //则执行线程池的拒绝策略
    if (isShutdown())
        reject(task);
    //线程池没有关闭
    else {
        //将任务添加到阻塞队列中
        super.getQueue().add(task);
        //如果当前线程池是SHUTDOWN状态
        //并且当前线程池状态下不能执行任务
        //并且成功从阻塞队列中移除任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            //取消任务的执行,但不会中断执行中的任务
            task.cancel(false);
        else
            //调用ThreadPoolExecutor类中的ensurePrestart()方法
            ensurePrestart();
    }
}

可以看到在delayedExecute方法内部调用了canRunInCurrentRunState方法,canRunInCurrentRunState方法的源码实现如下所示。

boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}

可以看到canRunInCurrentRunState方法的逻辑比较简单,就是判断线程池当前状态下能够执行任务。

另外,在delayedExecute方法内部还调用了ThreadPoolExecutor类中的ensurePrestart()方法,接下来,我们看下ThreadPoolExecutor类中的ensurePrestart()方法的实现,如下所示。

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

在ThreadPoolExecutor类中的ensurePrestart()方法中,首先获取当前线程池中线程的数量,如果线程数量小于corePoolSize则调用addWorker方法传递null和true,如果线程数量为0,则调用addWorker方法传递null和false。

关于addWork()方法的源码解析,大家可以参考【高并发专题】中的《【高并发】通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文,这里,不再赘述。

reExecutePeriodic方法

reExecutePeriodic方法的源代码如下所示。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    //线程池当前状态下能够执行任务
    if (canRunInCurrentRunState(true)) {
        //将任务放入队列
        super.getQueue().add(task);
        //线程池当前状态下不能执行任务,并且成功移除任务
        if (!canRunInCurrentRunState(true) && remove(task))
            //取消任务
            task.cancel(false);
        else
            //调用ThreadPoolExecutor类的ensurePrestart()方法
            ensurePrestart();
    }
}

总体来说reExecutePeriodic方法的逻辑比较简单,但是,这里需要注意和delayedExecute方法的不同点:调用reExecutePeriodic方法的时候已经执行过一次任务,所以,并不会触发线程池的拒绝策略;传入reExecutePeriodic方法的任务一定是周期性的任务。

onShutdown方法

onShutdown方法是ThreadPoolExecutor类中的钩子函数,它是在ThreadPoolExecutor类中的shutdown方法中调用的,而在ThreadPoolExecutor类中的onShutdown方法是一个空方法,如下所示。

void onShutdown() {
}

ThreadPoolExecutor类中的onShutdown方法交由子类实现,所以ScheduledThreadPoolExecutor类覆写了onShutdown方法,实现了具体的逻辑,ScheduledThreadPoolExecutor类中的onShutdown方法的源码实现如下所示。

@Override
void onShutdown() {
    //获取队列
    BlockingQueue<Runnable> q = super.getQueue();
    //在线程池已经调用shutdown方法后,是否继续执行现有延迟任务
    boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
    //在线程池已经调用shutdown方法后,是否继续执行现有定时任务
    boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
    //在线程池已经调用shutdown方法后,不继续执行现有延迟任务和定时任务
    if (!keepDelayed && !keepPeriodic) {
        //遍历队列中的所有任务
        for (Object e : q.toArray())
            //取消任务的执行
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        //清空队列
        q.clear();
    }
    //在线程池已经调用shutdown方法后,继续执行现有延迟任务和定时任务
    else {
        //遍历队列中的所有任务
        for (Object e : q.toArray()) {
            //当前任务是RunnableScheduledFuture类型
            if (e instanceof RunnableScheduledFuture) {
                //将任务强转为RunnableScheduledFuture类型
                RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
                //在线程池调用shutdown方法后不继续的延迟任务或周期任务
                //则从队列中删除并取消任务
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) {
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    //最终调用tryTerminate()方法
    tryTerminate();
}

ScheduledThreadPoolExecutor类中的onShutdown方法的主要逻辑就是先判断线程池调用shutdown方法后,是否继续执行现有的延迟任务和定时任务,如果不再执行,则取消任务并清空队列;如果继续执行,将队列中的任务强转为RunnableScheduledFuture对象之后,从队列中删除并取消任务。大家需要好好理解这两种处理方式。最后调用ThreadPoolExecutor类的tryTerminate方法。有关ThreadPoolExecutor类的tryTerminate方法的源码解析,大家可以参考【高并发专题】中的《【高并发】通过源码深度分析线程池中Worker线程的执行流程》一文,这里不再赘述。

至此,ScheduledThreadPoolExecutor类中的核心方法的源代码,我们就分析完了。

推荐阅读

实践出真知:全网最强秒杀系统架构解密,不是所有的秒杀都是秒杀!!

注意:线程的执行顺序与你想的并不一样!!

深入解析Callable接口

两种异步模型与深度解析Future接口!

解密SimpleDateFormat类的线程安全问题和六种解决方案!

不得不说的线程池与ThreadPoolExecutor类浅析

深度解析线程池中那些重要的顶层接口和抽象类

从源码角度分析创建线程池究竟有哪些方式

通过源码深度解析ThreadPoolExecutor类是如何保证线程池正确运行的

通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程

通过源码深度分析线程池中Worker线程的执行流程

从源码角度深度解析线程池是如何实现优雅退出的

ScheduledThreadPoolExecutor与Timer的区别和简单示例

好了,今天就到这儿吧,我是冰河,我们下期见~~

目录
相关文章
|
28天前
|
存储 Java API
详细解析HashMap、TreeMap、LinkedHashMap等实现类,帮助您更好地理解和应用Java Map。
【10月更文挑战第19天】深入剖析Java Map:不仅是高效存储键值对的数据结构,更是展现设计艺术的典范。本文从基本概念、设计艺术和使用技巧三个方面,详细解析HashMap、TreeMap、LinkedHashMap等实现类,帮助您更好地理解和应用Java Map。
48 3
|
5月前
|
缓存 开发者 索引
深入解析 `org.elasticsearch.action.search.SearchRequest` 类
深入解析 `org.elasticsearch.action.search.SearchRequest` 类
|
1月前
|
存储 编译器 数据安全/隐私保护
【C++篇】C++类与对象深度解析(四):初始化列表、类型转换与static成员详解2
【C++篇】C++类与对象深度解析(四):初始化列表、类型转换与static成员详解
31 3
|
1月前
|
编译器 C++
【C++篇】C++类与对象深度解析(四):初始化列表、类型转换与static成员详解1
【C++篇】C++类与对象深度解析(四):初始化列表、类型转换与static成员详解
47 3
|
1月前
|
安全 编译器 C++
【C++篇】C++类与对象深度解析(三):类的默认成员函数详解
【C++篇】C++类与对象深度解析(三):类的默认成员函数详解
20 3
|
1月前
|
程序员 开发者 Python
深度解析Python中的元编程:从装饰器到自定义类创建工具
【10月更文挑战第5天】在现代软件开发中,元编程是一种高级技术,它允许程序员编写能够生成或修改其他程序的代码。这使得开发者可以更灵活地控制和扩展他们的应用逻辑。Python作为一种动态类型语言,提供了丰富的元编程特性,如装饰器、元类以及动态函数和类的创建等。本文将深入探讨这些特性,并通过具体的代码示例来展示如何有效地利用它们。
38 0
|
3月前
|
缓存 Java 开发者
Spring高手之路22——AOP切面类的封装与解析
本篇文章深入解析了Spring AOP的工作机制,包括Advisor和TargetSource的构建与作用。通过详尽的源码分析和实际案例,帮助开发者全面理解AOP的核心技术,提升在实际项目中的应用能力。
47 0
Spring高手之路22——AOP切面类的封装与解析
|
3月前
|
JSON 图形学 数据格式
Json☀️ 一、认识Json是如何解析成类的
Json☀️ 一、认识Json是如何解析成类的
|
3月前
|
开发者 编解码
界面适应奥秘:从自适应布局到图片管理,Xamarin响应式设计全解析
【8月更文挑战第31天】在 Xamarin 的世界里,构建灵活且适应性强的界面是每位开发者的必修课。本文将带您探索 Xamarin 的响应式设计技巧,包括自适应布局、设备服务协商和高效图片管理,帮助您的应用在各种设备上表现出色。通过 Grid 和 StackLayout 实现弹性空间分配,利用 Device 类检测设备类型以加载最优布局,以及使用 Image 控件自动选择合适图片资源,让您轻松应对不同屏幕尺寸的挑战。掌握这些技巧,让您的应用在多变的市场中持续领先。
39 0
|
3月前
|
存储 开发者 Ruby
【揭秘Ruby高手秘籍】OOP编程精髓全解析:玩转类、继承与多态,成就编程大师之路!
【8月更文挑战第31天】面向对象编程(OOP)是通过“对象”来设计软件的编程范式。Ruby作为一种纯面向对象的语言,几乎所有事物都是对象。本文通过具体代码示例介绍了Ruby中OOP的核心概念,包括类与对象、继承、封装、多态及模块混合,展示了如何利用这些技术更好地组织和扩展代码。例如,通过定义类、继承关系及私有方法,可以创建具有特定行为的对象,并实现灵活的方法重写和功能扩展。掌握这些概念有助于提升代码质量和可维护性。
37 0

热门文章

最新文章

  • 1
    高并发场景下,到底先更新缓存还是先更新数据库?
    69
  • 2
    Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
    75
  • 3
    Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
    68
  • 4
    Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
    65
  • 5
    Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
    56
  • 6
    Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
    71
  • 7
    在Java中实现高并发的数据访问控制
    42
  • 8
    使用Java构建一个高并发的网络服务
    32
  • 9
    微服务06----Eureka注册中心,微服务的两大服务,订单服务和用户服务,订单服务需要远程调用我们的用,户服务,消费者,如果环境改变,硬编码问题就会随之产生,为了应对高并发,我们可能会部署成一个集
    37
  • 10
    如何设计一个秒杀系统,(高并发高可用分布式集群)
    132
  • 推荐镜像

    更多
    下一篇
    无影云桌面