【多线程】线程池 | ScheduledThreadPoolExecutor

简介: ScheduledThreadPoolExecutor继承ThreadPoolExecutor对execute和submit进行了重写,同时也实现了ScheduledExecutorService特有的方法。

1. 概要

ScheduledThreadPoolExecutor继承ThreadPoolExecutor对execute和submit进行了重写,

同时也实现了ScheduledExecutorService特有的方法。

其主要的目的就是为了实现周期性执行任务或给定时间延后执行异步任务

网络异常,图片无法展示
|

这个类中还有两个重要的内部类

  1. DelayWorkQueue
    主要实现了阻塞队列的接口,可以看出他是一个专门定制的阻塞队列,
    网络异常,图片无法展示
    |

  2. ScheduledFutureTask
    ScheduledFutureTask具有FutureTask类的所有功能,并实现了RunnableScheduledFuture接口的所有方法。
    ScheduledFutureTask类的定义如下所示
    网络异常,图片无法展示
    |

2. 构造方法

//最大线程数是Integer.MAX_VALUE,理论上是一个无限大的线程池
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

3. 独有方法

//创建并执行一次操作,该操作在给定的延迟后启用。
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay, TimeUnit unit);
//这里传入的是实现Callable接口的任务                           
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay, TimeUnit unit);  
//执行时间将在initialDelay+period后开始执行
//进行检测上一次任务,上次一任务执行完毕,下一个任务才会执行                                                   
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);  
//initialDelay时间之后开始执行周期性任务                                              
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);    

4. 实践

没有实践

public static void schedule(ScheduledExecutorService executorService){
    executorService.schedule(() ->{
        for (int i= 0 ; i <10 ;i++) {
            executorService.submit(() -> {
                System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now());
            });
        }
    },5, TimeUnit.SECONDS);
}
public static void scheduleCall(ScheduledExecutorService executorService){
    ScheduledFuture scheduledFuture = executorService.schedule(() -> "scheduleCall",1L,TimeUnit.SECONDS);
    try {
        System.out.println(scheduledFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
public static void scheduleAtFixedRate(ScheduledExecutorService executorService) {
    ScheduledFuture scheduledFuture = executorService.scheduleAtFixedRate(() -> {
                for (int i = 0; i < 10; i++) {
                    executorService.submit(() -> {
                        System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now());
                    });
                }
            }
            , 0, 5L, TimeUnit.SECONDS);
    try {
        System.out.println(scheduledFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
//针对异常情况,会打断定时任务的执行
public static void scheduleWithFixedDelay(ScheduledExecutorService executorService) {
    ScheduledFuture scheduledFuture = executorService.scheduleWithFixedDelay(() -> {
                try {
                    int j = 1 / 0;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            , 10, 5L, TimeUnit.SECONDS);
}

5. 源码环节

参数说明: command:要执行的任务 delay:延迟时间 unit:时间单位

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    //将提交的任务转换成ScheduledFutureTask,如果period为true则是周期性任务
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
   //任务的主要执行方法                                   
    delayedExecute(t);
    return t;
}

ScheduledThreadPoolExecutor主要就是完成延时或周期性的任务,delayedExecute方法就是主要的实现, 最后通过ensurePrestart方法,添加线程启动任务。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
    //如果线程池已经停止,则调用拒绝策略
        reject(task);
    else {
        //将任务添加入Worker阻塞队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            //取消并删除任务
            task.cancel(false);
        else
        //至少启动一个线程去执行
            ensurePrestart();
    }
}
void ensurePrestart() {
    //通过底29位获取线程池中的线程数量
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

又看到我们熟悉的addWorker方法,我们可以翻阅前面的文章来了解下主要的执行流程。 addWorker的主要流程:

  1. 检查线程池状态
  2. 新建线程,使用Worker进行包装,放入Hashet数组中,最终真正执行任务的线程就放在Worker,所以新增一个addWorker就是新增一个线程。主要实现复用就是Worker类中的runWorker(this)
  3. 启动线程Start()
  4. 添加失败操作,移除Worker,减少WorkerCount

6.总结

总结

主要的三个对象

执行者:Worker

任务:ScheduledFutureTask

执行结果:ScheduledFuture

流程:

  1. 将提交的任务转换成ScheduledFutureTask
  2. 将ScheduledFutureTask添加到Worker队列中
  3. 调用addWorker方法,调用它的核心方法runWorker
  4. 调用getTask方法从阻塞队列中不断的去获取任务进行执行,直到从阻塞队列中获取的任务为 null 的话,线程结束终止
相关文章
|
1月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
109 38
|
18天前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
19 4
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
51 1
C++ 多线程之初识多线程
|
1月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
69 2
|
1月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
77 4
|
1月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
199 2
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
23 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
20 2
|
2月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
34 2
|
2月前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
39 1