【多线程】线程池 | ScheduledThreadPoolExecutor

简介: ScheduledThreadPoolExecutor继承ThreadPoolExecutor对execute和submit进行了重写,同时也实现了ScheduledExecutorService特有的方法。

1. 概要

ScheduledThreadPoolExecutor继承ThreadPoolExecutor对execute和submit进行了重写,

同时也实现了ScheduledExecutorService特有的方法。

其主要的目的就是为了实现周期性执行任务或给定时间延后执行异步任务

网络异常,图片无法展示
|

这个类中还有两个重要的内部类

  1. DelayWorkQueue
    主要实现了阻塞队列的接口,可以看出他是一个专门定制的阻塞队列,
    网络异常,图片无法展示
    |

  2. ScheduledFutureTask
    ScheduledFutureTask具有FutureTask类的所有功能,并实现了RunnableScheduledFuture接口的所有方法。
    ScheduledFutureTask类的定义如下所示
    网络异常,图片无法展示
    |

2. 构造方法

//最大线程数是Integer.MAX_VALUE,理论上是一个无限大的线程池
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

3. 独有方法

//创建并执行一次操作,该操作在给定的延迟后启用。
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay, TimeUnit unit);
//这里传入的是实现Callable接口的任务                           
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay, TimeUnit unit);  
//执行时间将在initialDelay+period后开始执行
//进行检测上一次任务,上次一任务执行完毕,下一个任务才会执行                                                   
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);  
//initialDelay时间之后开始执行周期性任务                                              
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);    

4. 实践

没有实践

public static void schedule(ScheduledExecutorService executorService){
    executorService.schedule(() ->{
        for (int i= 0 ; i <10 ;i++) {
            executorService.submit(() -> {
                System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now());
            });
        }
    },5, TimeUnit.SECONDS);
}
public static void scheduleCall(ScheduledExecutorService executorService){
    ScheduledFuture scheduledFuture = executorService.schedule(() -> "scheduleCall",1L,TimeUnit.SECONDS);
    try {
        System.out.println(scheduledFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
public static void scheduleAtFixedRate(ScheduledExecutorService executorService) {
    ScheduledFuture scheduledFuture = executorService.scheduleAtFixedRate(() -> {
                for (int i = 0; i < 10; i++) {
                    executorService.submit(() -> {
                        System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now());
                    });
                }
            }
            , 0, 5L, TimeUnit.SECONDS);
    try {
        System.out.println(scheduledFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
//针对异常情况,会打断定时任务的执行
public static void scheduleWithFixedDelay(ScheduledExecutorService executorService) {
    ScheduledFuture scheduledFuture = executorService.scheduleWithFixedDelay(() -> {
                try {
                    int j = 1 / 0;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            , 10, 5L, TimeUnit.SECONDS);
}

5. 源码环节

参数说明: command:要执行的任务 delay:延迟时间 unit:时间单位

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    //将提交的任务转换成ScheduledFutureTask,如果period为true则是周期性任务
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
   //任务的主要执行方法                                   
    delayedExecute(t);
    return t;
}

ScheduledThreadPoolExecutor主要就是完成延时或周期性的任务,delayedExecute方法就是主要的实现, 最后通过ensurePrestart方法,添加线程启动任务。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
    //如果线程池已经停止,则调用拒绝策略
        reject(task);
    else {
        //将任务添加入Worker阻塞队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            //取消并删除任务
            task.cancel(false);
        else
        //至少启动一个线程去执行
            ensurePrestart();
    }
}
void ensurePrestart() {
    //通过底29位获取线程池中的线程数量
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

又看到我们熟悉的addWorker方法,我们可以翻阅前面的文章来了解下主要的执行流程。 addWorker的主要流程:

  1. 检查线程池状态
  2. 新建线程,使用Worker进行包装,放入Hashet数组中,最终真正执行任务的线程就放在Worker,所以新增一个addWorker就是新增一个线程。主要实现复用就是Worker类中的runWorker(this)
  3. 启动线程Start()
  4. 添加失败操作,移除Worker,减少WorkerCount

6.总结

总结

主要的三个对象

执行者:Worker

任务:ScheduledFutureTask

执行结果:ScheduledFuture

流程:

  1. 将提交的任务转换成ScheduledFutureTask
  2. 将ScheduledFutureTask添加到Worker队列中
  3. 调用addWorker方法,调用它的核心方法runWorker
  4. 调用getTask方法从阻塞队列中不断的去获取任务进行执行,直到从阻塞队列中获取的任务为 null 的话,线程结束终止
相关文章
|
1天前
|
存储 测试技术
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
9 0
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
|
2天前
|
数据采集 Java Unix
10-多线程、多进程和线程池编程(2)
10-多线程、多进程和线程池编程
|
2天前
|
安全 Java 调度
10-多线程、多进程和线程池编程(1)
10-多线程、多进程和线程池编程
|
7天前
|
存储 Linux C语言
c++进阶篇——初窥多线程(二) 基于C语言实现的多线程编写
本文介绍了C++中使用C语言的pthread库实现多线程编程。`pthread_create`用于创建新线程,`pthread_self`返回当前线程ID。示例展示了如何创建线程并打印线程ID,强调了线程同步的重要性,如使用`sleep`防止主线程提前结束导致子线程未执行完。`pthread_exit`用于线程退出,`pthread_join`用来等待并回收子线程,`pthread_detach`则分离线程。文中还提到了线程取消功能,通过`pthread_cancel`实现。这些基本操作是理解和使用C/C++多线程的关键。
|
10天前
|
安全 Java
【极客档案】Java 线程:解锁生命周期的秘密,成为多线程世界的主宰者!
【6月更文挑战第19天】Java多线程编程中,掌握线程生命周期是关键。创建线程可通过继承`Thread`或实现`Runnable`,调用`start()`使线程进入就绪状态。利用`synchronized`保证线程安全,处理阻塞状态,注意资源管理,如使用线程池优化。通过实践与总结,成为多线程编程的专家。
|
9天前
|
Java 开发者
告别单线程时代!Java 多线程入门:选继承 Thread 还是 Runnable?
【6月更文挑战第19天】在Java中,面对多任务需求时,开发者可以选择继承`Thread`或实现`Runnable`接口来创建线程。`Thread`继承直接但限制了单继承,而`Runnable`接口提供多实现的灵活性和资源共享。多线程能提升CPU利用率,适用于并发处理和提高响应速度,如在网络服务器中并发处理请求,增强程序性能。不论是选择哪种方式,都是迈向高效编程的重要一步。
|
9天前
|
Java 开发者
震惊!Java多线程的惊天秘密:你真的会创建线程吗?
【6月更文挑战第19天】Java多线程创建有两种主要方式:继承Thread类和实现Runnable接口。继承Thread限制了多重继承,适合简单场景;实现Runnable接口更灵活,可与其它继承结合,是更常见选择。了解其差异对于高效、健壮的多线程编程至关重要。
|
11天前
|
Java 程序员
Java多线程编程是指在一个进程中创建并运行多个线程,每个线程执行不同的任务,并行地工作,以达到提高效率的目的
【6月更文挑战第18天】Java多线程提升效率,通过synchronized关键字、Lock接口和原子变量实现同步互斥。synchronized控制共享资源访问,基于对象内置锁。Lock接口提供更灵活的锁管理,需手动解锁。原子变量类(如AtomicInteger)支持无锁的原子操作,减少性能影响。
20 3
|
9天前
|
Java
JAVA多线程深度解析:线程的创建之路,你准备好了吗?
【6月更文挑战第19天】Java多线程编程提升效率,通过继承Thread或实现Runnable接口创建线程。Thread类直接继承启动简单,但限制多继承;Runnable接口实现更灵活,允许类继承其他类。示例代码展示了两种创建线程的方法。面对挑战,掌握多线程,让程序高效运行。
|
10天前
|
Java 调度
【实战指南】Java多线程高手秘籍:线程生命周期管理,掌控程序命运的钥匙!
【6月更文挑战第19天】Java多线程涉及线程生命周期的五个阶段:新建、就绪、运行、阻塞和死亡。理解这些状态转换对性能优化至关重要。线程从新建到调用`start()`变为就绪,等待CPU执行。获得执行权后进入运行状态,执行`run()`。遇到阻塞如等待锁时,进入阻塞状态。完成后或被中断则死亡。管理线程包括合理使用锁、利用线程池、处理异常和优雅关闭线程。通过控制这些,能编写更高效稳定的多线程程序。

热门文章

最新文章