首先要知道在线程池中空余线程被回收的条件:当线程池中的线程数量大于 corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过 keepAliveTime
。还有就是线程池执行 Task 的流程(这里借用《Java 并发编程的艺术》这本书中的一张图):
JUC 中的线程池使用很简单,但是源码还是有一定的复杂度的,那么这个地方从何下手呢。首先需要猜测判断回收条件的时机。根据上面的回收条件可以很自然的想到先去看看 getTask()
方法,这个方法就是从工作队列中获取 Task:
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker 4 * must exit because of any of: 5 * 1. There are more than maximumPoolSize workers (due to 6 * a call to setMaximumPoolSize). 7 * 2. The pool is stopped. 8 * 3. The pool is shutdown and the queue is empty. 9 * 4. This worker timed out waiting for a task, and timed-out 10 * workers are subject to termination (that is, 11 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 12 * both before and after the timed wait, and if the queue is 13 * non-empty, this worker is not the last thread in the pool. 14 * 15 * @return task, or null if the worker must exit, in which case 16 * workerCount is decremented 17 */ 18 private Runnable getTask() { 19 //上次从队列中 poll() 是否超时 20 boolean timedOut = false; // Did the last poll() time out? 21 22 for (;;) { 23 int c = ctl.get(); 24 int rs = runStateOf(c); 25 26 // Check if queue empty only if necessary. 27 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 28 decrementWorkerCount(); 29 return null; 30 } 31 //从名称中可以判断,工作的线程数 32 int wc = workerCountOf(c); 33 34 // Are workers subject to culling? 从这里开始,值得关注 35 //是否剔除 worker?核心线程是否超时或工作线程数大于核心线程数 36 // allowCoreThreadTimeOut:If false (default), core threads stay alive even when idle.If true, core threads use keepAliveTime to time out waitingfor work. 37 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 38 //由于可能在这个过程中执行了 setMaximumPoolSize() 方法。 39 //((如果工作线程数大于最大线程数)或者(上次从队列中 poll()是否超时 且 需要剔除工作线程))且(工作线程数大于 1 或 工作队列是空的) 40 if ((wc > maximumPoolSize || (timed && timedOut)) 41 && (wc > 1 || workQueue.isEmpty())) { 42 if (compareAndDecrementWorkerCount(c)) 43 return null; 44 continue; 45 } 46 47 //poll() 和 take() 都可以从队列中获取任务出来。具体使用哪个方法根据 timed 来判断。 take() 是没有就阻塞直到有,这里的 poll() 加了一个超时控制。 其实熟悉阻塞队列就应该知道取出元素的方法有两个poll()和take(), 前者是一个非阻塞方法,如果当前队列为空,直接返回,而take()是一个阻塞方法, 即如果当前队列为空,阻塞线程,封装线程到AQS的条件变量的条件队列中, 而上面的方法是一个介于二者之间的方法,语义是如果队为空,该方法会阻塞线程, 但是有一个阻塞时间,如果到时见还没有被唤醒,就自动唤醒; 看到这里就应该知道了,我们的线程在获取任务时,如果队列中已经没有任务, 会在此处阻塞keepALiveTime的 时间,如果到时间都没有任务, 就会return null(不是直接返回null,是最终),然后在runWorker()方法中,执行 processWorkerExit(w, completedAbruptly);终止线程; 48 try { 49 Runnable r = timed ? 50 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 51 workQueue.take(); 52 if (r != null) 53 return r; 54 timedOut = true; 55 } catch (InterruptedException retry) { 56 timedOut = false; 57 } 58 } 59}
从这个方法中貌似没有找到很明显的具体的时机,但是可以结合线程池执行任务时从 BlockingQueue
中 getTask()
和 这个方法注释中的第 4 点:
* 4. This worker timed out waiting for a task, and timed-out 2 * workers are subject to termination (that is, 3 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 4 * both before and after the timed wait, and if the queue is 5 * non-empty, this worker is not the last thread in the pool.
可以考虑去看看线程池的执行 Task 的方法,线程池执行 Task 本质就是通过 ThreadFactory
构建出来的 Thread
去 执行 Worker
中的 run()
方法:
1 /** Delegates main run loop to outer runWorker */ 2 public void run() { 3 runWorker(this); 4 } 1 final void runWorker(Worker w) { 2 ... 3 ... 4 try { 5 while (task != null || (task = getTask()) != null) { 6 ... 7 } 8 } finally { 9 processWorkerExit(w, completedAbruptly); 10 } 11 } 1/** 2 * Performs cleanup and bookkeeping for a dying worker. Called 3 * only from worker threads. Unless completedAbruptly is set, 4 * assumes that workerCount has already been adjusted to account 5 * for exit. This method removes thread from worker set, and 6 * possibly terminates the pool or replaces the worker if either 7 * it exited due to user task exception or if fewer than 8 * corePoolSize workers are running or queue is non-empty but 9 * there are no workers. 10 * 11 * @param w the worker 12 * @param completedAbruptly if the worker died due to user exception 13 */ 14 private void processWorkerExit(Worker w, boolean completedAbruptly) { 15 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 16 decrementWorkerCount(); 17 18 final ReentrantLock mainLock = this.mainLock; 19 mainLock.lock(); 20 try { 21 completedTaskCount += w.completedTasks; 22 //从 workers 中 remove 了一个 worker,即移除了一个工作线程 23 workers.remove(w); 24 } finally { 25 mainLock.unlock(); 26 } 27 28 tryTerminate(); 29 30 int c = ctl.get(); 31 if (runStateLessThan(c, STOP)) { 32 if (!completedAbruptly) { 33 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 34 if (min == 0 && ! workQueue.isEmpty()) 35 min = 1; 36 if (workerCountOf(c) >= min) 37 return; // replacement not needed 38 } 39 addWorker(null, false); 40 } 41 }
可以看到这个 processWorkerExit()
方法是在 runWorker()
方法的 finally
代码块中。即如果跳出了 runWorker()
方法的 while
循环,就会执行 processWorkerExit()
方法。跳出 while
循环的条件是 task 为 null 或者 getTask()
获取的结果为 null。在 processWorkerExit()
中,会从 workers
中移除 worker。说白了整个 Worker
的生命周期大致可以理解为:线程池干活了(execute()
/ submit()
),然后就是正式干活了(runWorker()
),使用 getTask()
获取任务(中间会有一系列的判断(corePoolSize
是否达到,任务队列是否满了,线程池是否达到了 maximumPoolSize
,超时等),如果没有 task 了,就进行后期的扫尾工作并且从 workers
中移除 worker。
注意上面的 addWorker(null, false); 这个其实是个注意点优化点 当执行一开始自己写的Runnable command 的run方法如果执行异常 也会走上面的代码completedAbruptly=true 然后执行addWorker(null, false); 创建一个空的worker线程继续执行
重点:所以run里最好是把所有的异常捕获 而不要抛出 不然抛出异常 就会立刻销毁线程 然后创建一个新线程 线程池的作用就没有
其中,核心线程数也是可以被销毁的
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; allowCoreThreadTimeOut 默认是false,但是如果设置成true,核心线程数也可以销毁