1 翻一翻ThreadPoolExecutor
1.1 类关系图
1.2 成员变量—线程池状态(生命周期)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
线程池的五种状态(生命周期):
- RUNNING:正在运行,接受新任务和处理排队的任务
- SHUTDOWN:准备关闭,不接受新任务,但处理排队的任务
- STOP:停止,不接受新任务,不处理排队的任务,中断正在进行的任务
- TIDYING:整理,所有的任务都已经终止,workerCount为0,转换到TIDYING状态的线程将运行terminated()钩子方法
- TERMINATED:终止, terminated()已经完成这些值之间的数字顺序很重要,允许顺序比较。 runState随时间单调地增加,但不需要触及每个状态
1.3 四个构造方法
/** * 使用默认ThreadFactory和Handler */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } /** * 使用默认Handler */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } /** * 使用默认ThreadFactory */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * 真正执行的构造方法 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
线程池的七大参数:
- corePoolSize:核心线程数,核心池大小是保持活动的最小工作线程数(不允许超时等),除非设置了allowCoreThreadTimeOut,在这种情况下,最小值为零。
- maximumPoolSize:最大线程数,实际的最大值在内部是由CAPACITY限制的。
- keepAliveTime:等待工作的空闲线程的超时(默认以纳秒计)。 当存在超过corePoolSize或allowCoreThreadTimeOut时,线程使用此超时。 否则他们将永远等待新的工作。
- unit:等待工作的空闲线程的超时(默认以纳秒计)的单位。
- workQueue:用于保存任务并将其传递给工作线程的队列。 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),所以只依赖isEmpty来查看队列是否为空(例如,当决定是否从SHUTDOWN转换到TIDYING时,我们必须这样做)。 这适用于特殊用途的队列,如DelayQueues,允许poll()返回null,即使稍后当延迟过期时它可能返回非null。
- threadFactory:工厂的新线程。所有线程都是使用这个工厂创建的(通过方法addWorker)。所有调用者都必须做好addWorker失败的准备,这可能反映了系统或用户限制线程数量的策略。即使没有将其视为错误,创建线程失败也可能导致新任务被拒绝或现有任务仍卡在队列中。我们进一步保留池不变量,甚至在遇到OutOfMemoryError之类的错误时,这些错误可能在尝试创建线程时抛出。这类错误相当常见,因为需要在Thread中分配本机堆栈。启动时,用户将希望执行清理池关闭来清理。可能会有足够的内存可用来完成清理代码,而不会遇到另一个OutOfMemoryError。
- handler:在执行中饱和或关闭时调用的处理程序。
1.4 execute方法
/** * 在将来某个时候执行给定的任务。 任务可以在新线程中执行,也可以在现有的池线程中执行。 * * 如果由于该执行器已关闭或已达到其容量,任务无法提交执行,则由当前处理 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 分三步进行: * 1. 如果运行的线程少于corePoolSize,则尝试使用给定命令作为其第一个任务启动一个新线程。 addWorker的调 * 用会自动检查runState和workerCount,从而通过返回false来防止在不应该添加线程时产生的错误警报。 * 2. 如果一个任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程(因为自上次检查后,现有的 * 已经死亡)或进入此方法后池关闭。 所以我们重新检查状态,如有必要则回滚正在排队的if停止,或启动一个新 * 的线程,如果没有。 * 3. 如果不能对任务进行排队,则尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了所以拒绝这个 * 任务。 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
2 看一看线程池的底层原理
2.1 线程的状态(生命周期)转换
- ctl:主线程池的状态,用AtomicInteger原子类表示,
- RUNNING:初始化时默认为RUNNING状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- SHUTDOWN:shutdown()方法,线程池不能够接受新的任务,它会等待所有任务执行完毕;
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
- STOP:shutdownNow()方法线程池不能接受新的任务,并且会去尝试终止正在执行的任务
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
- TIDYING:
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
- TERMINATED:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
2.2 线程池的阻塞队列
由此可见BlockingQueue是一个继承与Queue的接口,而他的具体实现最常用的无非一下几种
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
- DelayQueue:一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。不能将null元素放置到这种队列中。
- LinkedBlockingDuque:LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- LinkedTransferQueue:一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法。
- PriorityBlockingQueue:一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
- SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
2.3 线程池的执行过程
2.3.1 调用execute方法,参数为Runnable接口
用到了可重入锁和双重检查
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果运行的线程少于corePoolSize,则尝试使用给定命令作为其第一个任务启动一个新线程 if (workerCountOf(c) < corePoolSize) { // 调用addWorker if (addWorker(command, true)) return; c = ctl.get(); } //如果不能对任务进行排队,则尝试添加一个新的线程。如果失败,拒绝这个任务。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
2.3.2 addWorker加入任务到线程池
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //查看状态 int c = ctl.get(); //查看正在运行的任务 int rs = runStateOf(c); // 只在必要时检查队列是否为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //worker数量 int wc = workerCountOf(c); //判断:核心线程判断是否超过核心线程数,非核心线程判断是否超过最大线程数,超过则返回FALSE if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //新任务是否启动 boolean workerStarted = false; //新任务是否已添加 boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 保持锁定时重新检查,在ThreadFactory失败时退出 ,在获得锁之前关闭 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //预先检查t是可启动的 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { //添加任务异常 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
2.4 Worker的类结构
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * 这个类永远不会被序列化,但是我们提供了一个serialVersionUID来抑制javac警告。 */ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; /** * 使用给定的第一个任务和ThreadFactory中的线程创建。 * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // 在runWorker之前禁止中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 将主运行循环委托给外部的runWorker */ public void run() { runWorker(this); } // 锁方法 // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
2.5 线程池的拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
3 线程池的使用
[推荐文章](线程池的使用)