1.简介
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor
来启动线程比使用 Thread
的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。
补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
2.Executor 框架结构(主要由三大部分组成)
1) 任务( Runnable / Callable )
执行任务需要实现的Runnable
接口或Callable
接口。Runnable
接口或Callable
接口实现类都可以被 ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。
2) 任务的执行( Executor )
如下图所示,包括任务执行机制的核心接口Executor
,以及继承自Executor
接口的
ExecutorService
接口。ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
这两个关键类实现了 ExecutorService 接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
注意: 通过查看
ScheduledThreadPoolExecutor
源代码我们发现ScheduledThreadPoolExecutor
实际上是继承了ThreadPoolExecutor
并实现了ScheduledExecutorService
,而ScheduledExecutorService
又实现了ExecutorService
,正如我们下面给出的类关系图显示的一样。
ThreadPoolExecutor
类描述:
//AbstractExecutorService实现了ExecutorService接口 public class ThreadPoolExecutor extends AbstractExecutorService
ScheduledThreadPoolExecutor
类描述:
//ScheduledExecutorService实现了ExecutorService接口 public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
3) 异步计算的结果( Future )
Future
接口以及Future
接口的实现类FutureTask
类都可以代表异步计算的结果。
当我们把Runnable
接口或Callable
接口的实现类提交给 ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。(调用submit()
方法时会返回一个FutureTask
对象)
3.Executor 框架的使用示意图
- 主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。 - 把创建完成的实现
Runnable / Callable
接口的对象直接交给ExecutorService
执行:ExecutorService.execute(Runnable command))
或者也可以把Runnable
对象或Callable
对象提交给ExecutorService
执行( ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable <T> task))
。 - 如果执行
ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象(我们刚刚也提到过了执行execute()
方法和submit()
方法的区别,submit()
会返回一个FutureTask
对象)。由于FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。 - 最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
三、(重要)ThreadPoolExecutor类简单介绍
线程池实现类 ThreadPoolExecutor 是 Executor 框架最核心的类。
1.ThreadPoolExecutor 类分析
ThreadPoolExecutor
类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方
法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。
/** * 用给定的初始参数创建一个新的ThreadPoolExecutor。 */ public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量 int maximumPoolSize,//线程池的最大线程数 long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间 TimeUnit unit,//时间单位 BlockingQueue<Runnable> workQueue,//任务队列,用来储 存等待执行任务的队列 ThreadFactory threadFactory,//线程工厂,用来创建线程, 一般默认即可 RejectedExecutionHandler handler//拒绝策略,当提交的 任务过多而不能及时处理时,我们可以定制策略来处理任务 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。
ThreadPoolExecutor
3个最重要的参数:
corePoolSize
: 核心线程数线程数定义了最小可以同时运行的线程数量。maximumPoolSize
: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。workQueue
: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。
ThreadPoolExecutor
其他常见参数:
keepAliveTime
:当线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会
被回收销毁;unit
:keepAliveTime
参数的时间单位。threadFactory
:executor
创建新线程的时候会用到。handler
:饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解
ThreadPoolExecutor
饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,
ThreadPoolTaskExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
: 不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
: 此策略将丢弃最早的未处理的任务请求。
举个例子:
Spring 通过
ThreadPoolTaskExecutor
或者我们直接通过ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定RejectedExecutionHandler
饱和策略的话来配置线程池的时候默认使用的是ThreadPoolExecutor.AbortPolicy
。在默认情况下,
ThreadPoolExecutor
将抛出RejectedExecutionException
来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用
ThreadPoolExecutor.CallerRunsPolicy
。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看ThreadPoolExecutor
的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)
2.推荐使用 ThreadPoolExecutor 构造函数创建线程池
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
强制线程池不允许使用 Executors 去创建,而是通过ThreadPoolExecutor构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下:
FixedThreadPool
和SingleThreadExecutor
: 允许请求的队列长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致OOM。- CachedThreadPool和ScheduledThreadPool: 允许创建的线程数量为Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
方式一:通过ThreadPoolExecutor
构造函数实现(推荐)
方式二:通过 Executor 框架的工具类 Executors 来实现
我们可以创建三种类型的 ThreadPoolExecutor:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
对应 Executors 工具类中的方法如图所示:
四、 (重要)ThreadPoolExecutor 使用示例
我们上面讲解了Executor
框架以及ThreadPoolExecutor
类,下面让我们实战一下,来通过写一个ThreadPoolExecutor
的小Demo来回顾上面的内容。
1.示例代码:Runnable+ ThreadPoolExecutor
首先创建一个Runnable
接口的实现类(当然也可以是Callable
接口,我们上面也说了两者的区别。)
MyRunnable.java
import java.util.Date; /** * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。 * @author shuang.kou */ public class MyRunnable implements Runnable { private String command; public MyRunnable(String s) { this.command = s; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date()); processCommand(); System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date()); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return this.command; } }
编写测试程序,我们这里以阿里巴巴推荐的使用ThreadPoolExecutor
构造函数自定义参数的方式来创建线程池。
ThreadPoolExecutorDemo.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { private static final int CORE_POOL_SIZE = 5; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) { //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) Runnable worker = new MyRunnable("" + i); //执行Runnable executor.execute(worker); } //终止线程池 executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads"); } }
可以看到我们上面的代码指定了:
1.corePoolSize
: 核心线程数为 5。
2.maximumPoolSize
:最大线程数 10
3.keepAliveTime
: 等待时间为 1L。
4.unit
: 等待时间的单位为TimeUnit.SECONDS
。
5.workQueue
:任务队列为ArrayBlockingQueue
,并且容量为 100;
6.handler
:饱和策略为CallerRunsPolicy
。
Output:
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020 pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020 pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020 pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020 pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020 pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020 pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020 pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020 pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020 pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020 pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
2.线程池原理分析
承接上一节,我们通过代码输出结果可以看出:线程首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)
现在,我们就分析上面的输出内容来简单分析一下线程池原理。
**为了搞懂线程池的原理,我们需要首先分析一下execute
方法。**在上一节中的Demo中我们使用executor.execute(worker)
来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int workerCountOf(int c) { return c & CAPACITY; } //任务队列 private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 如果任务为null,则抛出异常。 if (command == null) throw new NullPointerException(); // ctl 中保存的线程池当前的一些状态信息 int c = ctl.get(); // 下面会涉及到 3 步 操作 // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添 加到该线程中;然后,启动该线程从而执行任务。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里 // 通过 isRunning 方法判断线程池状态,线程池处于RUNNING状态才会被并且队列可以加入任务,该任务才会被加入进去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次获取线程池状态,如果线程池状态不是RUNNING状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 if (!isRunning(recheck) && remove(command)) reject(command); // 如果当前线程池为空就新创建一个线程并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 else if (!addWorker(command, false)) reject(command); }
通过下图可以更好的对上面这 3 步做一个展示
addWorker
这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则的话返回的就是false。
// 全局锁,并发操作必备 private final ReentrantLock mainLock = new ReentrantLock(); // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合 private int largestPoolSize; // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才 能访问此集合 private final HashSet<Worker> workers = new HashSet<>(); //获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //判断线程池的状态是否为 Running private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * 添加新的工作线程到线程池 * @param firstTask 要执行 * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小 * @return 添加成功就返回true否则返回false */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //这两句用来获取线程池的状态 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取线程池中线程的数量 int wc = workerCountOf(c); // core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //原子操作将workcount的数量加1 if (compareAndIncrementWorkerCount(c)) break retry; // 如果线程的状态改变了就再次执行上述操作 c = ctl.get(); if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 标记工作线程是否启动成功 boolean workerStarted = false; // 标记工作线程是否创建成功 boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取线程池状态 int rs = runStateOf(ctl.get()); //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中 //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是 RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启 动新的Worker // firstTask == null证明只新建线程而不执行任务 if (rs < SHUTDOWN || ( rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //更新当前工作线程的最大容量 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 工作线程是否启动成功 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方 法启动真实的线程实例 if (workerAdded) { t.start(); /// 标记线程启动成功 workerStarted = true; } } } finally { // 线程启动失败,需要从工作线程中移除对应的Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
现在,让我们在回到上一节我们写的Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?
没搞懂的话,也没关系,可以看看我的分析:
我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。
3.几个常见的对比
1)Runnable vs Callable
Runnable
自Java 1.0以来一直存在,但Callable
仅在 Java1.5中引入,目的就是为了来处理Runnable
不支持的用例。 Runnable
接口不会返回结果或抛出检查异常,但是Callable
接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用Runnable
接口,这样代码看起来会更加简洁。
工具类Executors
可以实现Runnable
对象和Callable
对象之间的相互转换。(Executors.callable
(Runnable task
)或Executors.callable
(Runnable task
,Object resule
) )。
Runnable.java
@FunctionalInterface public interface Runnable { /** * 被线程执行,没有返回值也无法抛出异常 */ public abstract void run(); }
Callable.java
@FunctionalInterface public interface Callable<V> { /** * 计算结果,或在无法这样做时抛出异常。 * @return 计算得出的结果 * @throws 如果无法计算结果,则抛出异常 */ V call() throws Exception; }
2) execute() vs submit()
1.execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
最后
笔者已经把面试题和答案整理成了面试专题文档,有想获取到借鉴参考的朋友:点赞关注后,戳这里即可免费领取
ows Exception;
}
### 2) execute() vs submit() 1.`execute()`**方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;** ### 最后 > **笔者已经把面试题和答案整理成了面试专题文档,有想获取到借鉴参考的朋友:点赞关注后,[戳这里即可免费领取](https://gitee.com/vip204888/java-p7)** [外链图片转存中...(img-l7Pn5g6N-1628081071602)] [外链图片转存中...(img-DA2LGTbO-1628081071603)] [外链图片转存中...(img-xngqqc07-1628081071604)] [外链图片转存中...(img-4LD36F9v-1628081071605)] [外链图片转存中...(img-2M1T7HqH-1628081071606)] ![image](https://ucc.alicdn.com/images/user-upload-01/img_convert/8a02592cd22916e3e750d045a5ee6d00.png)