1. 初识多线程
相信大家使用多线程就会经常看到这样的代码,如下
public class EveryTaskOneThread {
public static void main(String[] args) {
for (int i = 0; i < 10; ++i) {
Thread thread = new Thread(new Task());
thread.start();
}
}
static class Task implements Runnable {
@Override
public void run() {
System.out.println("执行了任务");
}
}
}
当任务数量上升到1000+,这样内存开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并销毁线程所带来的开销问题。
1.1 为什么要使用线程池?
- 反复创建线程开销大
- 过多的线程会占用太多内存
1.2 线程池的好处
- 加快响应速度;
- 合理利用
CPU
和内存 - 统一管理资源
1.3 线程池适合应用的场景
1.服务器接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
2.实际在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理
2. 线程池必须记住的参数
线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
corePoolSize
指的是核心线程数
- 创建的核心线程即使在空闲时,也不会被回收。
maxPoolSize
指的是最大线程数
- 在核心线程数的基础上,额外增加非核心线程,核心线程数+非核心线程数不能超过
maxPoolSize
keepAliveTime
指的是回收非核心线程的等待时间
- 如果线程池当前的线程数多于
corePoolSize
,那么如果多余的非核心线程线程空闲时间超过keepAliveTime
,它们就会被终止
handler
是一种拒绝策略
- 拒绝策略可以在任务满了之后,拒绝执行多余的任务
3. 线程池添加线程的规则
当此时新来一个任务需要执行,线程池会怎么处理?
1.如果线程数小于corePoolSize
,即使其他工作线程处于空闲状态,也会创建一个新核心线程来运行新任务。
2.如果线程数等于(或大于)corePoolSize
但少于maximumPoolSize
,则将任务放入队列。
3.如果队列已满,并且线程数小于maxPoolSize
,则创建一个新非核心线程来运行任务。
4.如果队列已满,并且线程数大于或等于maxPoolSize
,则拒绝该任务。
总结:
是否需要增加线程的判断顺序是:1、corePoolSize
2、workQueue
3、maxPoolSize
举个例子:
线程池的核心线程数corePoolSize
大小为5,最大池maxPoolSize
大小为10,队列workQueue
为100。
因为线程中的请求最多会创建5个,然后任务将被添加到队列中,直到达到100。当队列已满时,将创建新的线程,最多到10个线程,如果再来任务,就拒绝。
4. 增减线程的特点
- 通过设置
corePoolSize
和maximumPoolSize
相同,就可以创建固定大小的线程池。 - 我们使用线程池一般希望保持较少的线程数,并且只有在负载变得很大时才增加它。
- 通过设置
maximumPoolSize
为很高的值,例如Integer.MAX_VALUE
,可以允许线程池容纳任意数量的并发任务。 - 只有在队列填满时才创建多于
corePoolSize
的非核心线程,如果使用的是无界队列(例如LinkedBlockingQueue
),那么线程数就不会超过corePoolSize
。
5. ThreadFactory 用来创建线程
新的线程是由
ThreadFactory
创建的,默认使用Executors.defaultThreadFactory()
创建出来的线程都在同一个线程组,拥有同样的
NORM_PRIORITY
优先级并且都不是守护线程。如果自己指定
ThreadFactory
,那么就可以改变线程名、线程组、优先级、是否是守护线程等。通常使用默认的
ThreadFactory
就可以了
Executors
部分代码
// 验证第一点,新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory()
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
......
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// 验证第二点,创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
6. 守护线程
作用:给用户线程提供服务
在Java
中有两类线程:User Thread
(用户线程)、Daemon Thread
(守护线程)
6.1 守护线程的分类标准
分类标准是线程是否会阻止JVM
的停止——只要当前JVM
实例中尚存在任何一个非守护线程没有结束,守护线程就全部继续工作;只有当最后一个非守护线程结束时,所有守护线程才会随着JVM
一同结束工作。(非守护线程等同于用户线程)
我们知道,Java
虚拟机通常会继续执行线程,直到发生以下两种中的任一情况时,Java
程序才能运行结束:
- 已调用
System.exit()
方法 - 所有非守护程序线程的线程都已结束而一般情况下我们不会调用
System.exit()
方法,所以大部分的Java
程序的结束都是由于所有用户线程都结束而导致的。
所以可以认为,任何一个守护线程都是整个JVM
中所有用户线程(非守护线程)的管家。Daemon
的作用是为其他线程的运行提供便利服务,守护线程最典型的应用就是 GC
(垃圾回收器),它是一个很称职的守护者。守护线程的特性线程类型默认继承自父线程,守护线程创建的线程为默认是守护线程,同样,用户线程创建的线程默认为用户线程。非守护线程如果想创建一个守护线程,需要调用Thread.setDaemon
来设置它(Thread
类用布尔值daemon
属性来表示线程是否是守护线程),并且,该方法必须在start
之前调用,否则会抛出 IllegalThreadStateException
异常。
6.2 守护线程被谁启动
通常由JVM
启动,而不是由用户去启动。
当JVM
启动时,通常会有一个非守护线程(通常为执行main
函数的线程)。不影响JVM
退出当只剩下守护线程时,JVM
就会退出,因为如果只剩下守护线程,就没必要继续运行程序了。守护线程没结束并不会影响JVM
的正常停止:假设所有用户线程都结束了,那么就算有5个守护线程正在运行,JVM
也会正常停止:守护线程和普通线程的区别User
和Daemon
两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果 User Thread
已经全部退出运行了,只剩下Daemon Thread
存在了,虚拟机也就退出了,这是因为没有了“被守护者”,Daemon
也就没有工作可做了,也就没有继续运行程序的必要了。这就是守护线程的作用:告诉JVM
不需要等待它退出,当JVM
中所有的线程都是守护线程的时候,JVM
就可以正常的退出了。
6.3 是否需要给线程设置为守护线程?
我们通常不应把自己的线程设置为守护线程,因为设置为守护线程是很危险的。比如线程正在访问如文件、数据库的时候,所有用户线程都结束了,那么守护线程会在任何时候甚至在一个操作的中间发生中断,所以守护线程永远不应该去访问固有资源。
7. 工作队列workQueue
有3种最常见的队列类型:
直接交接:SynchronousQueue
- 工作任务不多的情况下,只是将任务中转,就可以用
SynchronousQueue
,这个队列本身内部没有容量,使用这种队列,maximumPoolSize
就可能需要设置的大一些。因为没有队列容量作为缓冲了,很容易创建新线程。
无界队列:LinkedBlockingQueue
- 这种队列容量无限大,可以防止流量突增。设置
maximumPoolSize
也用不到,因为队列装不满,永远不需要创建新的非核心线程。但是也有风险,处理任务的速度跟不上提交的速度,可能造成内存浪费或者OOM
。
有界的队列:ArrayBlockingQueue
- 这种队列可以设置大小,这样设置
maximumPoolSize
参数就有意义了,队列满了就创建新的非核心线程执行任务,不超过maximumPoolSize
即可。
8. 线程池应该手动创建还是自动创建?
其实手动创建更好,因为这样可以更加明确线程池的运行规则,避免资源耗尽的风险。
自动创建线程池(即直接调用JDK
封装好的构造方法)可能带来一些风险。
8.1 newFixedThreadPool
// 我们执行如下代码发生了什么
ExecutorService executorService = Executors.newFixedThreadPool(4)
查看源码
......
// ThreadPoolExecutor函数声明是这样的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
......
public static ExecutorService newFixedThreadPool(int nThreads) {
// 这里核心线程数和最大线程数都是传进来的nThreads,这里是4,后面keepAliveTime设置为0,因为没什么意义,不会创建非核心线程,也不会被回收。
// 因为线程数量固定了,万一传进来非常多任务怎么办,LinkedBlockingQueue无界队列,容纳无数任务,防止流量突增。
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
所以因为无界队列LinkedBlockingQueue
的存在,使用newFixedThreadPool
容易造成大量内存占用,可能会导致OOM
,下面我们用例子体会一下这种情况造成的OOM
。
VM options
设置为-Xmx8m -Xms8m
,这样更快达到OOM
public class FixedThreadPoolOOM {
private static ExecutorService executorService = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new SubThread());
}
}
}
class SubThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000000000);// 一直睡眠,塞满队列,体会OOM
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
很快就会报错
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at threadpool.FixedThreadPoolOOM.main(FixedThreadPoolOOM.java:14)
8.2 newSingleThreadExecutor
// 执行下面代码会发生什么
ExecutorService executorService = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
// 可以看到这个和上一个例子很相似,就是固定大小为1的线程池,不创建非核心线程,有LinkedBlockingQueue作为缓冲
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
那么同理,当请求堆积的时候,可能会占用大量的内存线程池,原理和newFixedThreadPool
一样,只是线程数量设为了1
8.3 newCachedThreadPool
可以缓存线程池,具有自动回收多余线程的功能。
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
运行的部分截图如下,甚至看到了创建了958个线程
我们看看源码
public static ExecutorService newCachedThreadPool() {
// 核心线程数为0,所有的非核心线程均可回收。线程最大数为Integer.MAX_VALUE,这几乎看作无限大没有限制了。
// 非核心线程回收的时间为60秒,利用SynchronousQueue交换队列交换数据,即又更多任务进来利用SynchronousQueue交换再创建新线程。
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
这种创建方式的弊端在于第二个参数maximumPoolSize
被设置为了Integer.MAX_VALUE
,这可能会创建数量非常多的线程,甚至导致OOM
8.4 newScheduledThreadPool
一般用来执行和时间相关的任务
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
// threadPool.schedule(new Task(), 5, TimeUnit.SECONDS); // 延迟5s执行Task任务
threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS); // 最初延迟1s执行Task,之后每隔3s执行Task
}
}
代码结果就不打印了
来看看源码
......
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
......
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 可以看到这里设置核心线程数、最大线程数、非核心线程回收等待时间为0、延迟队列DelayedWorkQueue达到时间执行任务等等,原理和newCachedThreadPool一样,请看上一个例子
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
......
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
原因和newCachedThreadPool
一样
综上,这几种自动创建均有缺陷。
正确的创建线程池的方法:根据不同的业务场景,设置线程池参数。比如内存有多大,给线程取什么名字等等。
9. 线程池里的线程数量设定为多少比较合适?
这个得看任务类型
CPU
密集型(加密、计算hash
等):最佳线程数为CPU
核心数的1-2倍左右。- 耗时
IO
型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU
核心数很多倍 ,参考Brain Goetz
专家推荐的计算方法:线程数=CPU
核心数*(1+平均等待时间/平均工作时间)
如果需要更精确的线程数量,那就需要根据不同的程序去做压测,这样就能得到比较合适的线程数量。
10.停止线程池的正确方法
10.1 shutdown
不一定立即停止。执行了该方法之后,后面再请求执行的任务会拒绝,当前线程正在执行的任务和任务队列等待的任务还是会执行完才会停止线程池。即存量任务等待执行完毕,新任务拒绝。
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; ++i) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);// 先执行1500ms再去停止
executorService.shutdown();
executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
运行结果:
执行shutdown()
之后,再来一个任务会被拒绝,抛出异常java.util.concurrent.RejectedExecutionException...Shutting down, pool size = 10, active threads = 10, queued tasks = 70, completed tasks = 20
,已经提示正在关闭,后面的任务不接受了。队列还有70各任务等待,正在执行10个,已完成20个。
那总不能每次执行一个新任务看是否被拒绝来判断是否正在停止吧?于是有了isShutdown
方法
10.2 isShutdown
只要执行了shutdown
方法,isShutdown
方法就会返回true
,只要把上面代码executorService.shutdown();
前后各添加一句System.out.println(executorService.isShutdown());
就可以看到一个false
一个true
,打印完成后,任务队列还有任务在继续执行打印。
10.3 isTerminated
可以判断线程池是不是已经完全终止
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; ++i) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
System.out.println(executorService.isTerminated()); // false 此时队列还有任务
Thread.sleep(10000);
System.out.println(executorService.isTerminated()); // true 等待时间足够长,线程池已完全停止
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
执行结果:
中间打印false
:
等待10s后,最后打印true
,表示线程池已经完全停止。
10.4 awaitTermination
测试一段时间内任务是否执行完毕,作判断用。执行这个方法后指定时间内,线程处于阻塞状态,线程运行完毕则返回true
,否则返回false
,代表超时。
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
boolean b = executorService.awaitTermination(1L, TimeUnit.SECONDS);
// 1s内肯定执行不完,阻塞等待1s后返回false
System.out.println(b);
// 7s内执行完了,阻塞等待着,然后提前执行完就直接返回true
boolean b1 = executorService.awaitTermination(7L, TimeUnit.SECONDS);
System.out.println(b1);
}
}
// ShutDownTask代码之前例子已给出
有3种情况会返回awaitTermination()
会返回值,返回前是阻塞的。
1.所有任务执行完毕了,返回true
2.等待的时间到了,返回false
3.等待过程被中断,抛出InterruptedException
10.5 shutdownNow
试终止所有正在执行的任务,线程池正在执行的线程收到中断信号,并停止处理等待队列中的的任务,最后将所有未执行的任务以列表的形式返回
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
List<Runnable> runnableList = executorService.shutdownNow();
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
11. 任务拒绝策略
任务拒绝时机
- 当
Executor
关闭时,提交新任务会被拒绝。 - 以及当
Executor
对最大线程和工作队列容量使用有限边界并且已经饱和时。
11.1 AbortPolicy
直接抛出异常java.util.concurrent.RejectedExecutionException
,和之前一样,调用shutdown()
之后,还去执行新任务executorService.execute(new ShutDownTask())
11.2 DiscardPolicy
把任务默默丢弃,并且不会通知
11.3 DiscardOldestPolicy
丢弃最老的存在时间最久的任务
11.4 CallerRunsPolicy
这种策略有2大好处。
1.任务拒绝后让提交任务的线程去执行,比如主线程调用的execute
方法,任务爆满拒绝后让主线程代劳执行,避免了业务损失。
2.给主线程代劳执行主线程此刻就不能继续添加新任务了,就必须把代劳的任务执行完才可以执行新任务,执行代劳任务的时候线程池的其他任务也能同时执行完一些,给了线程池的线程执行任务的一些缓冲时间。
12. 线程池的暂停和恢复
这里涉及到钩子函数,代码中的构造函数跳过不用看。
public class PauseableThreadPool extends ThreadPoolExecutor {
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition(); // 返回与此Lock实例一起使用的Condition实例
private boolean isPaused;
// 下面几个是创建构造器用super匹配父类构造器,避免语法报错
// ========================下面几个构造不用看=========================
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
}
// ============================上面这些构造不用看========================
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 线程执行之前调用
super.beforeExecute(t, r);
System.out.println("钩子======");
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 1000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}
打印结果
13. 线程池的实现原理和源码分析
线程池组成部分
- 线程池管理器(
ThreadPool
)
用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务。 - 工作线程(
PoolWorker
)
我们创建出来执行任务的线程,会反复从队列中取任务,然后去执行。 - 任务队列(
taskQueue
)
存放任务的队列有很多种,把没有处理的任务放在队列中,因为线程池会同时有多个线程去队列中取任务,所以任务队列必须是支持并发的。所以线程池选用线程安全的BlockingQueue
来作为任务队列。 - 任务接口(
Task
)
线程从任务队列中取出来的任务
13.1 Executor家族?
线程池、ThreadPoolExecutor
、ExecutorService
、Executor
、Executors
等这么多和线程池相关的类,都是什么关系?
Executor
顶层接口就是用来执行方法的public interface Executor { void execute(Runnable command); }
ExecutorService
继承了Executor
接口,还增加了新的方法,拥有了初步管理线程池的方法。Executors
是一个工具类,快速创建线程池都是用的它,比如创建newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
、newScheduledThreadPool
这些线程池等
- 创建线程池为什么返回的不是
ThreadPoolExecutor
对象呢而是一个ExecutorService
?如下:ExecutorService executorService = Executors.newFixedThreadPool(4);
其实我们看源码知道,返回的其实是new ThreadPoolExecutor
,而返回值类型是ExecutorService
,因为ExecutorService
是ThreadPoolExecutor
的间接的一个父接口(其父类实现的接口),这就是多态。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
13.2 线程池实现线程复用的原理
不管愿不愿意,你必须看点源代码才能理解,我尽量不堆砌多余的源代码
// ThreadPoolExecutor类的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl记录了线程池状态和线程数
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果此时的线程数小于核心线程数
// 传入参数1:即将让线程执行的任务
// 参数2:true表示增加线程的时候判断当前线程数是否小于corePoolSize,false表示判断当前线程数是否小于maximumPoolSize
if (addWorker(command, true))
return;
c = ctl.get();
}
// 判断到这里说明当前线程数大于等于corePoolSize,线程池如果正在运行状态,那么就把Runnable任务放在工作队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); // 获取线程池状态
// 如果已经不在运行状态了,移除这个Runnable,并且拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 任务可以发生异常导致线程不再执行,如果此时工作线程为0,但是任务队列里面可能还有任务,这样为了避免卡死
// 新建一个核心工作线程,去执行队列剩余任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 走到这个else if逻辑说明,程数大于等于corePoolSize,并且要么此时线程池处于不再运行状态了或者说任务队列已经满了,
// 那么就去增加核心工作线程,直到线程数大于maximumPoolSize时,拒绝后面任务。
// addWorker会返回是否新增线程成功,true-成功,false-失败
else if (!addWorker(command, false))
reject(command);
}
需要注意的是,addWorker
方法里面会有new Worker(runnable对象)
,代表新建一个工作线程(核心线程),最终会执行到runWorker
,runWorker
方法里面会获取当前Runnable
任务,拿不到就到阻塞队列去取出来
// Worker类里面runWorker方法部分代码如下
Runnable task = w.firstTask;// 这个firstTask是Runnable对象
......
// 这里有task = getTask()去阻塞队列取任务
while (task != null || (task = getTask()) != null) {
...
task.run(); // 线程开始执行任务
...
}
这个Worker
类里面的runWorker
方法描述了同一个线程执行不同的任务的原因,整体能看到线程复用的过程,这些线程是怎么一个个反复执行新任务的。
代码就是这样,如果一步步跟进需要大量贴源代码,由于篇幅原因,这里就不贴了,请自行根据编译器查看源码。
13.3 线程池的状态
线程池一共5种状态
欢迎一键三连~
有问题请留言,大家一起探讨学习
----------------------Talk is cheap, show me the code-----------------------