💪🏻 制定明确可量化的目标,坚持默默的做事。
什么是线程池?
线程池主要是为了解决执行新任务执行时,应用程序为减少为任务创建一个新线程和任务执行完毕时销毁线程所带来的开销。通过线程池,可以在项目初始化时就创建一个线程集合,然后在需要执行新任务时重用这些线程而不是每次都新建一个线程,一旦任务已经完成了,线程回到线程池中并等待下一次分配任务,达到资源复用的效果。
线程池主要优势?
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
如何创建线程?
通过Executors 创建线程池 |
newSingleThreadExecutor:创建一个只有一个线程的线程池,串行执行所有任务,即使空闲时也不会被关闭。可以保证所有任务的执行顺序按照任务的提交顺序执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。 适用场景:需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程活动的应用场景。 newFixedThreadPool:创建一个固定线程数量的线程池(corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列)。初始化时线程数量为零,之后每次提交一个任务就创建一个线程,直到线程达到线程池的最大容量。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。 适用场景:为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。 newCachedThreadPool:创建一个可缓存的线程池,线程的最大数量为Integer.MAX_VALUE。空闲线程会临时缓存下来,线程会等待60s还是没有任务加入的话就会被关闭。 适用场景:适用于执行很多的短时间异步任务的小程序,或者是负载较轻的服务器。 newScheduledThreadPool:创建一个支持执行延迟任务或者周期性执行任务的线程池。 |
ThreadPoolExecutor | 阿里巴巴开发手册并发编程有一条规定:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这是为什么呢?主要是因为这样的可以避免资源耗尽的风险,因为使用Executors返回线程池对象的弊端有: FixedThreadPool 和 SingleThreadPool 允许的阻塞队列长度为 Integer.MAX_VALUE,这样会导致堆积大量的请求,从而导致OOM; CachedThreadPool 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。 所以创建线程池,最好是根据线程池的用途,然后自己创建线程池。 |
一、线程池系列相关文章
1.1 Java线程池ThreadPoolExcutor01-参数说明
1.2 Java线程池ThreadPoolExcutor02-阻塞队列之ArrayBlockingQueue
1.3 Java线程池ThreadPoolExcutor03-阻塞队列之LinkedBlockingQueue
1.4 Java线程池ThreadPoolExcutor04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1.5 Java线程池ThreadPoolExcutor05-阻塞队列之DelayQueue原理及扩容机制详解
1.6 Java线程池ThreadPoolExcutor06-阻塞队列之SynchronousQueue
1.7 Java线程池ThreadPoolExcutor07-阻塞队列之LinkedTransferQueue
1.8 Java线程池ThreadPoolExcutor07-阻塞队列之LinkedBlockingDeque
1.9 Java线程池ThreadPoolExcutor08-4种拒绝策略
二、继承实现关系图
三、低层数据存储结构
3.1 核心属性
public class ThreadPoolExecutor extends AbstractExecutorService { ... private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; ... }
3.1.1 说明
属性名 | 说明 |
COUNT_BITS | 用于计算线程池的状态值、容量 |
workQueue | 阻塞队列。七个: ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。详见1.2链接。 LinkedBlockingQueue: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene。详见1.3链接。 PriorityBlockingQueue:具有优先级的无界阻塞队列。详见1.4链接。 DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。详见1.5链接。 SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。详见1.6链接。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。详见1.7链接。 LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。详见1.8链接。 |
threadFactory |
线程工厂 |
handler | 拒绝策略处理类。四种:AbortPolicy策略(默认)、DiscardPolicy策略、DiscardOldestPolicy策略 和 CallerRunsPolicy策略。详见1.9链接。 |
keepAliveTime |
为多余的空闲线程等待新任务的最长时间, 超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余 的空闲线程会被立即终止。 |
allowCoreThreadTimeOut | 如果为true,核心线程使用keepAliveTime超时等待工作。 如果为false(默认),核心线程即使在空闲时也保持活动。 |
corePoolSize | 核心线程数 |
maximumPoolSize | 最大线程数 |
3.1.2 线程池五种状态
属性ctl 是ThreadPoolExecutor内部一个用来进行技术和状态控制的控制变量,它使用了一个原子整形字段来实现两个方面的管理:
- 低29位记录线程池的线程数,
- 高3位记录线程池的工作状态
五种状态:
- RUNNING:线程池一旦被创建处于RUNNING状态。线程池处于RUNNING状态时,能够接收新任务以及对已添加的任务进行处理。
- SHUTDOWN:线程池处于SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING状态转变为SHUTDOWN状态。
- STOP: 线程池处于STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由RUNNING状态或者SHUTDOWN状态变为STOP状态。
- TIDYING: 当所有的任务已终止,ctl记录的任务数为0,线程池的状态会变为TIDYING状态。当线程池状态为SHUTDOWN时,阻塞队列为空并且线程池中执行的任务也为空时,就会由SHUTDOWN状态变为 TIDYING状态;当线程池为STOP时,线程池中执行的任务为空时,就会又STOP状态变为 TIDYING状态。
- TERMINATED: 线程池彻底终止,就会变成TERMINATED状态。线程池处于TIDYING状态时,调用terminated()就会由TIDYING状态变为TERMINATED状态。
状态转换如下图:
图3.1.2-1
3.2 构造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
参数名 | 说明 |
corePoolSize | 核心线程数 |
maximumPoolSize | 最大线程数 |
keepAliveTime | 为多余的空闲线程等待新任务的最长时间, 超过这个时间后多余的线程将被终止。 |
unit | keepAliveTime的单位 |
workQueue | 阻塞队列 |
threadFactory | 线程工厂 |
handler | 拒绝策略处理类 |
四、工作原理
- 如果任务null直接退出,否则执行步骤2;
- 若工作线程数小于核心线程数,执行步骤3创建工作线程并执行任务,否则执行步骤4。
- 若阻塞队列已满
- 工作线程数小于最大线程数,执行步骤6创建工作线程并执行任务,否则执行步骤10 拒绝任务(执行拒绝策略)
- 若阻塞队列未满,添加任务到阻塞队列,若线程状态不为运行中,则任务从队列中取出,并执行步骤10 拒绝任务(执行拒绝策略)
五、源码解析
5.1 核心方法execute()
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); //通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 若添加工作任务执行任务不成功,尝试添加到阻塞队列中 c = ctl.get(); } // 线程数超过核心线程数,任务添加到阻塞队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 添加到阻塞队列,但若线程池状态不是运行中,则从队列中取出 if (! isRunning(recheck) && remove(command)) // 并且执行拒绝策略 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 阻塞队列已满,则若工作线程数小于最大线程数 创建线程执行任务 else if (!addWorker(command, false)) // 若线程数已达最大线程数,否则执行拒绝策略 reject(command); }
从源码中可以清晰看出线程池执行任务的逻辑与“四、工作原理”所述一致。
5.2 添加任务addWork()
THreadPoolExecutor内部类Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //当前Worker所处于的线程 final Thread thread; //待执行的任务 Runnable firstTask; //任务计数器 volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } }
它是对Runnable进行了封装,主要功能是对待执行的任务进行中断处理和状态监控。Worker还继承了AQS,在每个任务执行时进行了加锁的处理。可以将Worker简单理解为可中断的、可进行锁处理的Runnable。
创建工作线程并执行是addWork()方法,源码如下:
private boolean addWorker(Runnable firstTask, boolean core) { // 自旋,判断线程池状态,并对线程数量执行原子+1操作 retry: for (;;) { int c = ctl.get(); // 获取线程池状态 int rs = runStateOf(c); // 如果线程池已经关闭,则直接返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 判断线程数是否已达上限,根据传入参数core的不同,判断corePoolSize或者maximumPoolSize。 // 如果线程数已达上限,直接返回false int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 执行原子操作,对线程数+1 if (compareAndIncrementWorkerCount(c)) // 执行原子操作成功,则退出自旋 并 创建工作线程执行任务 break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } // ctl变量操作成功,执行Worker相关逻辑 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建一个新的Worker,传入待执行的任务 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 加锁后,再次判断线程池状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 如果创建了新的Worker,则调用其start方法立即执行 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWork()方法首先在一个自旋中做三个判断:
- 线程是否关闭,若关闭则直接返回false退出
- 通过参数core来确定工作线程数与核心线程数比较 还是 与最大线程数比较,若工作线程数大,则返回false退出
- CAS尝试将线程数加1,若成功则创建一个辨析的Worker并立即执行其start()方法执行该任务。
5.3 执行任务runWork()
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 指向线程池调execute添加的任务 Runnable task = w.firstTask; w.firstTask = null; // 首先释放锁,允许中断 w.unlock(); boolean completedAbruptly = true; try { // 从worker中取第1个任务,若任务为空则从阻塞队列中取任务,直到返回null,这里达到线程复用的效果,实现线程处理多个任务。 while (task != null || (task = getTask()) != null) { // 执行任务前先加锁 w.lock(); // 如果线程池已经终止,则中断该线程。保存了线程池在STOP状态下线程中断的,非STOP状态下线程没有被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 记录正在运行的任务 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务(调任务的run方法) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行完任务,清除记录信息 afterExecute(task, thrown); } } finally { task = null; // 当前Worker计数器+1,统计worker执行了多少任务,最后累加进completedTaskCount变量,可以调用相应方法返回一些统计信息。 w.completedTasks++; // 释放锁 w.unlock(); } } // 表示worker是否异常终止,执行到这里代表执行正常,后续的方法需要这个变量 completedAbruptly = false; } finally { // completedTasks累加到completedTaskCount变量中 processWorkerExit(w, completedAbruptly); } }
runWorker()的主要逻辑就是进行线程池的关闭检查,然后执行任务,并将计数器+1。
注意这行代码 while (task != null || (task = getTask()) != null) ,当task = w.firstTask 的值为null时执行task = getTask(), getTask是从任务列队是取任务。也就是说,Worker在执行完提交给自己的任务后,会执行任务队列中的任务。
5.4 阻塞队列取任务getTask()
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
说明:
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) 这行代码体现出了 SHUTDOWN状态和 STOP状态的区别。若线程池状态为 SHUDOWN 状态,则条件为 false,取任务执行;而如果线程池的状态为 STOP 状态,则条件为 true,不管队列是否还有任务,不再处理了。
- timed后在的判断逻辑有点复杂,以下几种情况为true,CAS尝试将线程数减1
- 工作线程数大于最大线程数(后面wc>1||workQueue.isEmpty()应该自然满足)(可能是在运行中调用setMaximumPoolSize)
- 设置了allowCoreThreadTimeOut为true且队列中取的任务为null,说明没任务了
- 工作线程数大于核心线程数 且队列中取的任务为null(后面wc>1||workQueue.isEmpty()应该自然满足)
- try后面逻辑
- 延时取任务:allowCoreThreadTimeOut为true 或者 wc > corePoolSize
- 直接取任务(若没任务则阻塞等待):allowCoreThreadTimeOut为false 或者 wc <= corePoolSize
结论:
- allowCoreThreadTimeOut设置为true时,工作线程数达最大之后,因无新任务而线程减少,工作线程总数最小值可以为0
- allowCoreThreadTimeOut设置为false时,只有wc大于核心线程数,才去做CAS减线程数操作,所以工作线程数达到最大之后,因无新任务而线程减少,工作线程总数最小为核心线程数
5.5 Worker无任务最后处理processWorkerExit()
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); // 判断状态是否小于STOP if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
说明:
- decrementWorkerCount():如果为异常结束,则工作线程数减1
- try 逻辑:加锁累加完成任务数
- tryTerminate(): 尝试终止线程池
- 判断状态是否小于STOP为true
- allowCoreThreadTimeOut设置为true
- 若队列不为空:至少保留一个worker
- 若队列为空:直接退出,线程池的worker数减少,最终可能为0
- allowCoreThreadTimeOut设置为false: 则保持worker数不少于corePoolSize(若线程数小于corePoolSize,则添加 null任务的worker
总结worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。
5.6 关闭线程池
从图3.1.2-1看出有两种方法关闭线程池:
- shutdown: 不能再提交任务,已经提交的任务可继续执行;
- shutdownNow: 不能再提交任务,已经提交的任务未执行的任务不再执行,正在执行的任务可继续执行,但会中断,返回已提交未执行的任务
5.6.1 关闭线程池shutdown()
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
说明:
- checkShutdownAccess(): 安装策略机构
- advanceRunState(SHUTDOWN): 线程池状态切换到SHUTDOWN状态
- interruptIdleWorkers(): 中断所有空闲的worker
- tryTerminate(): 尝试结束线程池
5.6.2 关闭线程池shutdownNow()
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
说明:
- checkShutdownAccess(): 安装策略机构
- advanceRunState(STOP): 线程池状态切换到STOP状态
- interruptWorkers(): 中断所有空闲的worker
- drainQueue(): 取出等待队列里未执行的任务
- tryTerminate(): 尝试结束线程池
5.6.3 尝试结束线程池tryTerminate()
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
说明:
- ctl.get():获取ctl值判断线程池状态,以下状态不处理直接return
- RUNNING状态,正在运行状态肯定不能停
- TIDYING或TERMINATED状态,已经没有正在运行的worker了
- SHUTDOWN状态且阻塞队列不为空,执行完才能停
- 工作线程数不为0。又调了一次interruptIdleWorkers(ONLY_ONE),可能疑惑在调tryTerminate之前时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?我们需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。
- try逻辑:加锁CAS尝试将线程池状态切换成TIDYING,再切换成TERMINATED状态,terminated是空方法供子类来实现。
5.6.4 判断线程池是否关闭awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
说明:接收人timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。
ps: 以上是研读源码加上翻阅许多文献理解的总结,如有错误或不足的地方,欢迎指出,欢迎留言交流。我会继续努力学习和分享更多有干货的内容。