本文参考于《Java并发编程的艺术》
1、线程池
1.1、为什么使用线程池?
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
1.2、线程池的实现原理
1. 线程池的主要处理流程
- 线程池
判断核心线程池里的线程是否都在执行任务
。如果不是,则创建一个新的工作线程来执行任务。 - 如果核心线程池里的线程都在执行任务,则
线程池判断工作队列是否已经满
。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。 - 如果工作队列满了,则
线程池判断线程池的线程是否都处于工作状态
。如果没有,则创建一个新的工作线程来执行任务。 - 如果已经满了,则交给饱和策略来处理这个任务。
1.3、excute()方法源码分析
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
- 如果线程数小于核心线程数,则创建线程并执行当前任务
- 如线程数大于等于核心线程数或线程创建失败,则将当前任务放到工作队列中。
- 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
- 否则则通过
reject()
执行相应的拒绝策略的内容。
1.4、工作线程
工作线程:线程池创建线程时,会将线程封装成工作线程Worker
,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。
图示说明
- 在
execute()
方法中创建一个线程时,会让这个线程执行当前任务。 - 这个线程执行完上图中1的任务后,
会反复从BlockingQueue获取任务来执行
。
1.5、线程池的创建
源代码
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
(核心线程池的基本大小):线程池允许创建的最小线程数量。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。runnableTaskQueue
(任务队列):用于保存等待执行的任务的阻塞队列
。
- runnableTaskQueue(
任务队列
):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。 - ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
maximumPoolSize
(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果
。ThreadFactory
:用于设置创建线程的工厂。RejectedExecutionHandler
(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy
,表示无法处理新任务时抛出异常。- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
keepAliveTime
(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
1.6、execute()和submit()方法
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。submit()
方法用于提交需要返回值的任务。线程池会返回一个future
类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
submit()源码
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
1.7、关闭线程池
1. 关闭线程池方法说明
- 可以通过调用线程池的
shutdown
或shutdownNow
方法来关闭线程池。 - 它们的原理是
遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程
,所以无法响应中断的任务可能永远无法终止。 - 但是它们存在一定的区别,
shutdownNow
首先将线程池的状态设置成STOP
,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown
只是将线程池的状态设置成SHUTDOWN
状态,然后中断所有没有正在执行任务的线程。 - 通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用
shutdownNow
方法。
2. 返回关闭状态的方法说明
- 只要调用了这两个关闭方法中的任意一个,
isShutdown
方法就会返回true。 - 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用
isTerminaed
方法会返回true。
2、Executor框架
2.1、Executor框架简介
2.1.1、Executor框架的两级调度模型
- 在上层,
Java多线程程序通常把应用分解为若干个任务
,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程; - 在底层,操作系统内核将这些线程映射到硬件处理器上。
2.1.2、Executor框架的结构
1. 结构
- 任务:包括被执行任务需要实现的接口:
Runnable
接口或Callable
接口。 - 任务的执行:包括任务执行机制的核心接口
Executor
,以及继承自Executor的ExecutorService
接口。 - 异步计算的结果:包括接口
Future
和实现Future接口的FutureTask
类。
2. 主要的类和接口
Executor
是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。ThreadPoolExecutor
是线程池的核心实现类,用来执行被提交的任务。ScheduledThreadPoolExecutor
是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。Future
接口和实现Future接口的FutureTask
类,代表异步计算的结果。Runnable
接口和Callable
接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。
2.1.3、 Executor框架的使用
- 主线程首先要
创建实现Runnable或者Callable接口的任务对象
。 - 然后可以
把Runnable对象直接交给ExecutorService执行
,或者也可以把Runnable对象或Callable对象提交给ExecutorService执行。 ExecutorService将返回一个实现Future接口的对象
。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。- 最后,
主线程可以执行FutureTask.get()方法来等待任务执行完成
。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
2.1.4、Executor框架的成员
- ThreadPoolExecutor
FixedThreadPool
:创建使用固定线程数的FixedThreadPool。SingleThreadExecutor
:创建使用单个线程的SingleThreadExecutor。CachedThreadPool
:创建一个会根据需要创建新线程的CachedThreadPool。
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
:包含若干个线程的ScheduledThreadPoolExecutor。SingleThreadScheduledExecutor
。只包含一个线程的ScheduledThreadPoolExecutor。
- Future接口:用来表示异步计算的结果
- Runnable接口和Callable接口
2.2、ThreadPoolExecutor
2.2.1、参数详解
- corePool:核心线程池的大小。
- maximumPool:最大线程池的大小。
- BlockingQueue:用来暂时保存任务的工作队列。
- RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。
2.2.2、FixedThreadPool
1. 简介
FixedThreadPool
被称为可重用固定线程数的线程池。
2. execute()方法
- 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
- 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入
LinkedBlockingQueue
。 - 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。
3. 其使用无界队列LinkedBlockingQueue带来的影响
- 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
- 使用无界队列时
maximumPoolSize将是一个无效参数
。 - 使用无界队列时
keepAliveTime将是一个无效参数
。 - 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。
2.2.3、SingleThreadExecutor
1. 简介
SingleThreadExecutor
是使用单个worker线程的Executor。- SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。
- 其他参数与FixedThreadPool相同。
- SingleThreadExecutor使用
无界队列LinkedBlockingQueue
作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同
2. execute()方法
- 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。
- 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加LinkedBlockingQueue。
- 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。
2.2.4、CachedThreadPool
1. 简介
CachedThreadPool
是一个会根据需要创建新线程的线程池。- CachedThreadPool的
corePoolSize
被设置为0,即corePool为空。 maximumPoolSize
被设置为Integer.MAX_VALUE,即maximumPool是无界的。- 这里把
keepAliveTime
设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。 - CachedThreadPool使用没有容量的
SynchronousQueue
作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。
2. execute()方法
- 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,
主线程把任务交给空闲线程执行
,execute()方法执行完成。否则就执行步骤2。 - 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤(1)将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。 - 在步骤(2)中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源
2.3、ScheduledThreadPoolExecutor
2.3.1、简介
它主要用来在给定的延迟之后运行任务,或者定期执行任务
。ScheduledThreadPoolExecutor
可以在构造函数中指定多个对应的后台线程数。
2.3.2、运行机制
- 使用
DelayQueue
作为任务队列。DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize
在ScheduledThreadPoolExecutor中没有什么意义。DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)
。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
1. 包含的成员变量
- long型成员变量time,表示
这个任务将要被执行的具体时间
。 - long型成员变量sequenceNumber,表示这个任务被添加到
ScheduledThreadPoolExecutor中的序号
。 - long型成员变量period,表示任务执行的间隔周期。
2. 任务执行步骤
- 线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。
到期任务是指ScheduledFutureTask的time大于等于当前时间
。 - 线程1执行这个ScheduledFutureTask。
- 线程1
修改ScheduledFutureTask的time变量
为下次将要被执行的时间。 - 线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
2.4、FutureTask
2.4.1、简介
Future
接口和实现Future接口的FutureTask
类,代表异步计算的结果。
1. FutureTask的状态
- 未启动:
FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态
。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。 - 已启动:
FutureTask.run()方法被执行的过程中
,FutureTask处于已启动状态。 - 已完成:
FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(..))
,或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。
2. get方法和cancel方法的执行
- 当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
- 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
- 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;
- 当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;
- 当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);
- 当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。