三、线程池状态
每个线程池都会带有一个原子整型,用来表示自己的状态
按bit位来分,(高3位)标记线程池状态,(低29位)表示线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
那么线程池总共有多少种状态呢?一共有五种,它们的变化关系如下:
- RUNNING
- 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
- 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,
- 就处于RUNNING状态,并且线程池中的任务数为0!
- SHUTDOWN
- 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
- 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
- STOP
- 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
- 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
- TIDYING
- 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
- 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
- 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING
- TERMINATED
- 状态说明:线程池彻底终止,就变成TERMINATED状态。
- 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED
四、线程池的任务提交
在经过上一章的学习后,你应该已经得到了一个ExecutorService实例了,此时我们可以通过两种方式提交任务(Runnable):
tp.execute(runnable)
tp.submit(runnable) 或 tp.submit(task)
这两者我们该怎么选呢,下面我们就来详细讲一下
1. execute
execute只能用来执行runnable的实现类,而且没有返回值,事实上,这是因为run()本身就没有返回值导致的,因此这种方式,最好用来执行不需要知道结果的任务。
注意:当出现异常时,异常会被catch住,然后throw出来,本线程销毁
2. submit
submit其实内层调用的还是execute,此时传入execute的参数类型是RunnableFuture,同时继承Runnable 和 Future
submit可以用来提交Runnable或者Callable,Callable任务带有返回值,因此submit会有一个Future返回很合理,通过这个future.get()就能获取返回值;
但是runnable任务没有返回值,为什么也有一个Future返回呢?其实是你传的Runnable 最后还是会被封装成Callable后再执行,由于Runnable没有返回结果,所以在将Runnable包装为Callable的时候,会传入一个预期结果null,此时使用get方法返回一个null
注意:当使用的submit时,得益于FutureTask中有try-catch来存储异常,所以出现异常,FutureTask自己就消化并存起来,并可通过future.get()获取到异常,而不是直接往外抛,因此直接使用submit是不会报错的,线程池里的线程得以存活
五、线程执行异常
我们提交的任务,不总是能顺利执行,一旦出现异常,我们该怎么处理呢?
- 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常。这种方法比较简单,也有他的局限性,不够灵活且增大代码量
- 使用submit提交任务,在调用future.get()方法时,会将保存的异常重新抛出
- 在执行任务的过程中,如果出现异常,也可以通过自己写个类,继承ThreadPoolExecutor并重写该afterExecute()方法来处理,注意,此时线程还是因异常而终止了。
- 当一个线程因为未捕获的异常而退出时,JVM会把这个事件报告给应用提供的UncaughtExceptionHandler异常处理器,于是就有了第三种解决任务代码抛出异常的方案:为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常。
那如何为工作者线程设置UncaughtExceptionHandler呢?ThreadPoolExecutor的构造函数提供一个ThreadFactory,可以在其中设置我们自定义的UncaughtExceptionHandler,这里不再赘述。
注意:这个方案不适用于使用submit方式提交任务的情况,原因上面也提到了,FutureTask的run方法捕获异常后保存,不再重新抛出,意味着runWorker方法并不会捕获到抛出的异常,线程也就不会退出,也不会执行我们设置的UncaughtExceptionHandler。
六、线程池执行步骤(简易)
这一章,我们会说一说当一个任务被提交进线程池,会经历什么步骤?我们可以看以下的精简步骤:
任务会优先以核心线程运行,当核心线程达到上限时,再往里面提交线程,会把线程放入队列中等待。除非队列放不下了,才会启用非核心线程来运行任务。所以不要用无界队列。如果非核心线程也满了,则执行拒绝策略
当然上面说的流程是一个大体方向,具体的细节我们只能通过源码来讲,如果你有源码恐惧症,我也给你提了一个精简版源码流程,如果你也喜欢看源码,可以看下一章的源码级流程。
- 我们先去创建一个Worker(内含一个线程) 并且把我们的任务传到Worker的firstTask变量里
- Worker创建完成以后调用runWorker方法;
- runWorker方法里面先把Worker自己的firstTask走完(调用runnable.run()),然后会通过getTask()方法从线程池的阻塞队列里面拿缓存的runnable
- 如果当前线程数超核心线程上限,getTask会以 poll(timeout, unit)取任务,一段时间取不到,就会返回null,Worker内部Thread的run方法因为没有后续任务而走完,线程生命周期结束;
- 如果当前线程数没有超核心线程上限,从队列拿任务时,是以take方法去拿,此时会让线程挂起,直到取到任务再返回
- 取到任务以后再去执行这个任务
七、线程池执行步骤(源码)
1.提交任务,判断是否新建线程执行,或者加入阻塞队列
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取当前线程池状态,包括线程数的情况 int c = ctl.get(); // 当前线程数小于指定的核心线程,直接加worker核心线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 代表大于核心线程数,或者加worker失败,加入阻塞队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 二次检查,如果当前线程池已关闭,则拒绝该任务 if (! isRunning(recheck) && remove(command)) reject(command); // 添加队列成功,但没有可用线程(如指定核心线程为0且当前没有非核心线程), // 以无命令 - 非核心形式新加worker,来执行队列里的任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 代表加入阻塞队列失败,加非核心worker else if (!addWorker(command, false)) // 代表加非核心worker,执行拒绝(内容由拒绝策略实现) reject(command); }
2.新增worker(加任务,将任务新建线程,并启动线程来执行)
private boolean addWorker(Runnable firstTask, boolean core) { // retry循环,本循环主要判断线程池状态是否支持新增一个worker retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 这个地方直接判断添加worker失败需要满足以下条件 // running是负数,shutdown是0,所以要求线程池的状态是大于等于shutdown,即 // 停工、停止、整理、终结状态,但有一种情况除外,即线程池状态是SHUTDOWN // 但是task是null且队列不为空时,对应的场景是addWorker(null, false) // 这是新建一个无命令的保底线程,执行阻塞队列里面的任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 小循环,本循环主要目的为循环原子操作增加worker for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 原子性增加活动线程 if (compareAndIncrementWorkerCount(c)) break retry; // 原子性增加活动线程失败,查看最新的活动线程数和状态 c = ctl.get(); // Re-read ctl // 如果线程池状态变化了,跳出小循环,重新retry循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 这个是一个排他锁,只允许线程一个个排队进入执行 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 线程池是否在运行,或者是否是保底线程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果这个worker带着的Thread已经在运行了,说明有问题,抛异常 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
3.任务执行
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 这个task(runnable)就是当初我们执行execute方法传入的参数, // 也就是我们要去完成的任务 Runnable task = w.firstTask; // worker会被复用,它的firstTask主动设置成null,方便java垃圾回收机制回收。 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 循环执行,任务来自于传入的task,或队列里的task while (task != null || (task = getTask()) != null) { w.lock(); // 双重检查,确保线程中断和线程池stop是强关联,即线程池stop时 // 线程一定是被标记中断的,如果线程池不是stop,要把线程中断标志去掉, // 去掉标记后,再检查一遍线程池状态,如果此时状态变成stop了, // 还得把标志恢复回去 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 一个回调方法,可以继承重写在里面做一些想做的事情 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 待实现,异常的后置处理可以写这里 afterExecute(task, thrown); } } finally { // 将task置空,否则出不了循环 task = null; // 给当前的worker标记,告诉他你又完成了一个任务,如果是通过task!=null // 进来的这个completedTasks++完以后肯定是1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // worker执行完所有任务 后续处理 processWorkerExit(w, completedAbruptly); } }
4.获取其他任务 (获取阻塞队列里的任务Task)
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 线程池状态为stop,或者阻塞队列空了,减少worker,直接返回空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 上面的if判断如果过去了,那当前状况其实只有两种 // 一种是Running 另外一种是shutDown但是workQueue不为空 // 这两种状况都会继续让worker干活 int wc = workerCountOf(c); // worker可以过期吗? // 如果线程数超核心线程限制,或核心线程也有超时限制,则返回true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 分为两部分看 // 第一部分(wc > maximumPoolSize || (timed && timedOut) // 判断需不需要减少worker, 有需要则判断 // 第二部分(wc > 1 || workQueue.isEmpty()) // 判断有没有能力,当前条件能不能减少worker if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 是否需要过期一些线程, 如果是,则以超时限制从队列中取一个任务, // 如果取不到然后超时了,则返回null,取得到则返回task // 如果不会过期,则以take方式阻塞住,直到返回一个任务 // 这里是线程复用或终结的关键, Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 执行到这里,说明如果有时限,worker肯定没获取到新任务而且超时了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
5.执行完task后的后续处理
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 异常退出,worker数量减一 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 汇总该worker(一个worker实际上就是一个线程)完成的任务数量到线程池 completedTaskCount += w.completedTasks; // 将本worker从worker集合中剔除 workers.remove(w); } finally { mainLock.unlock(); } //退出woker的时候,检查下线程池状态 tryTerminate(); int c = ctl.get(); // 线程池的状态是runing 或者 shutdown if (runStateLessThan(c, STOP)) { // 是因为没有任务了,而非线程异常,才要终止这个线程的 if (!completedAbruptly) { // 保留的最小线程 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 执行到这里,说明有异常,或者当前活跃线程小于最小要求,建个保底线程 addWorker(null, false); } }
总结
上面我们已经非常详细的讲解了线程池的方方面面,对于线程的使用,注意事项乃至原理,应该都有相当深刻的了解了,如果你有什么补充和意见,也欢迎评论区留下你的想法