Java并发编程的艺术 -- 线程池和Executor框架(第九、十章)

简介: 线程池和Executor框架(第九、十章)
本文参考于《Java并发编程的艺术》

1、线程池

1.1、为什么使用线程池?

  1. 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

1.2、线程池的实现原理

1. 线程池的主要处理流程

在这里插入图片描述

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。
  2. 如果核心线程池里的线程都在执行任务,则线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。
  3. 如果工作队列满了,则线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。
  4. 如果已经满了,则交给饱和策略来处理这个任务

1.3、excute()方法源码分析

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
    return c & CAPACITY;
}

private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // ctl 中保存的线程池当前的一些状态信息
    int c = ctl.get();

    //  下面会涉及到 3 步 操作
    // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
        if (!isRunning(recheck) && remove(command))
            reject(command);
            // 如果当前线程池为空就新创建一个线程并执行。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);
}
  1. 如果线程数小于核心线程数,则创建线程并执行当前任务
  2. 如线程数大于等于核心线程数或线程创建失败,则将当前任务放到工作队列中。
  3. 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
  4. 否则则通过reject()执行相应的拒绝策略的内容。

1.4、工作线程

工作线程:线程池创建线程时,会将线程封装成工作线程WorkerWorker在执行完任务后,还会循环获取工作队列里的任务来执行

图示说明

在这里插入图片描述

  • execute()方法中创建一个线程时,会让这个线程执行当前任务。
  • 这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行

1.5、线程池的创建

源代码

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit unit,
                      BlockingQueue<Runnable> workQueue,
                      ThreadFactory threadFactory,
                      RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
            throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. corePoolSize核心线程池的基本大小):线程池允许创建的最小线程数量。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
  2. runnableTaskQueue任务队列):用于保存等待执行的任务的阻塞队列
  • runnableTaskQueue任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  1. maximumPoolSize线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果
  2. ThreadFactory:用于设置创建线程的工厂。
  3. RejectedExecutionHandler饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

    • AbortPolicy:直接抛出异常。
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。
  1. keepAliveTime线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

1.6、execute()和submit()方法

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  • submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的 get()方法来获取返回值get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

submit()源码

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

1.7、关闭线程池

1. 关闭线程池方法说明

  • 可以通过调用线程池的shutdownshutdownNow方法来关闭线程池。
  • 它们的原理遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
  • 但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
  • 通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法

2. 返回关闭状态的方法说明

  • 只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true
  • 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true

2、Executor框架

2.1、Executor框架简介

2.1.1、Executor框架的两级调度模型

在这里插入图片描述

  • 在上层Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;
  • 在底层,操作系统内核将这些线程映射到硬件处理器上。

2.1.2、Executor框架的结构

1. 结构

  1. 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。
  2. 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。
  3. 异步计算的结果:包括接口Future和实现Future接口的FutureTask类。

2. 主要的类和接口

  1. Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
  2. ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  3. ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
  4. Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
  5. Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。

2.1.3、 Executor框架的使用

在这里插入图片描述

  1. 主线程首先要创建实现Runnable或者Callable接口的任务对象
  2. 然后可以把Runnable对象直接交给ExecutorService执行,或者也可以把Runnable对象或Callable对象提交给ExecutorService执行。
  3. ExecutorService将返回一个实现Future接口的对象。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。
  4. 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

2.1.4、Executor框架的成员

  1. ThreadPoolExecutor
  • FixedThreadPool:创建使用固定线程数的FixedThreadPool。
  • SingleThreadExecutor:创建使用单个线程的SingleThreadExecutor。
  • CachedThreadPool:创建一个会根据需要创建新线程的CachedThreadPool。
  1. ScheduledThreadPoolExecutor
  • ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor。
  • SingleThreadScheduledExecutor。只包含一个线程的ScheduledThreadPoolExecutor。
  1. Future接口:用来表示异步计算的结果
  2. Runnable接口和Callable接口

2.2、ThreadPoolExecutor

2.2.1、参数详解

  • corePool:核心线程池的大小。
  • maximumPool:最大线程池的大小。
  • BlockingQueue:用来暂时保存任务的工作队列。
  • RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。

2.2.2、FixedThreadPool

1. 简介

FixedThreadPool被称为可重用固定线程数的线程池

2. execute()方法

在这里插入图片描述

  1. 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行

3. 其使用无界队列LinkedBlockingQueue带来的影响

  1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
  2. 使用无界队列时maximumPoolSize将是一个无效参数
  3. 使用无界队列时keepAliveTime将是一个无效参数
  4. 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

2.2.3、SingleThreadExecutor

1. 简介

  • SingleThreadExecutor是使用单个worker线程的Executor
  • SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1
  • 其他参数与FixedThreadPool相同。
  • SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同

2. execute()方法

在这里插入图片描述

  1. 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。
  2. 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加LinkedBlockingQueue。
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

2.2.4、CachedThreadPool

1. 简介

  • CachedThreadPool一个会根据需要创建新线程的线程池
  • CachedThreadPool的 corePoolSize被设置为0,即corePool为空。
  • maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。
  • 这里把 keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
  • CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。

2. execute()方法

在这里插入图片描述

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成。否则就执行步骤2。
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤(1)将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成
  3. 在步骤(2)中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源

2.3、ScheduledThreadPoolExecutor

2.3.1、简介

  • 它主要用来在给定的延迟之后运行任务,或者定期执行任务
  • ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数

2.3.2、运行机制

  • 使用DelayQueue作为任务队列。DelayQueue是一个无界队列,所以ThreadPoolExecutor的 maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义。DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

1. 包含的成员变量

  • long型成员变量time,表示这个任务将要被执行的具体时间
  • long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
  • long型成员变量period,表示任务执行的间隔周期。

2. 任务执行步骤

在这里插入图片描述

  1. 线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())到期任务是指ScheduledFutureTask的time大于等于当前时间
  2. 线程1执行这个ScheduledFutureTask。
  3. 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
  4. 线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())

2.4、FutureTask

2.4.1、简介

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

1. FutureTask的状态

在这里插入图片描述

  1. 未启动FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
  2. 已启动FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
  3. 已完成FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(..)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

2. get方法和cancel方法的执行

在这里插入图片描述

  • 当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
  • 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
  • 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);
  • 当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。
相关文章
|
2天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
90 60
【Java并发】【线程池】带你从0-1入门线程池
|
16天前
|
存储 缓存 Java
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
59 3
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
|
13天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
72 14
|
16天前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
48 13
|
17天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
18天前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
42 17
|
27天前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
54 26
|
2月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
239 2
|
3月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
3月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####