前言
前段时间写过一篇《线程池没你想的那么简单》,和大家一起撸了一个基本的线程池,具备:
- 线程池基本调度功能。
- 线程池自动扩容缩容。
- 队列缓存线程。
- 关闭线程池。
这些功能,最后也留下了三个待实现的 features
。
- 执行带有返回值的线程。
- 异常处理怎么办?
- 所有任务执行完怎么通知我?
这次就实现这三个特性来看看 j.u.c
中的线程池是如何实现这些需求的。
再看本文之前,强烈建议先查看上文《线程池没你想的那么简单》
任务完成后的通知
大家在用线程池的时候或多或少都会有这样的需求:
线程池中的任务执行完毕后再通知主线程做其他事情,比如一批任务都执行完毕后再执行下一波任务等等。
以我们之前的代码为例:
总共往线程池中提交了 13 个任务,直到他们都执行完毕后再打印 “任务执行完毕” 这个日志。
执行结果如下:
为了简单的达到这个效果,我们可以在初始化线程池的时候传入一个接口的实现,这个接口就是用于任务完成之后的回调。
public interface Notify { /** * 回调 */ void notifyListen() ; }
以上就是线程池的构造函数以及接口的定义。
所以想要实现这个功能的关键是在何时回调这个接口?
仔细想想其实也简单:只要我们记录提交到线程池中的任务及完成的数量,他们两者的差为 0 时就认为线程池中的任务已执行完毕;这时便可回调这个接口。
所以在往线程池中写入任务时我们需要记录任务数量:
为了并发安全的考虑,这里的计数器采用了原子的 AtomicInteger
。
而在任务执行完毕后就将计数器 -1 ,一旦为 0 时则任务任务全部执行完毕;这时便可回调我们自定义的接口完成通知。
JDK 的实现
这样的需求在 jdk 中的 ThreadPoolExecutor
中也有相关的 API
,只是用法不太一样,但本质原理都大同小异。
我们使用 ThreadPoolExecutor
的常规关闭流程如下:
executorService.shutdown(); while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { logger.info("thread running"); }
线程提交完毕后执行 shutdown()
关闭线程池,接着循环调用 awaitTermination()
方法,一旦任务全部执行完毕后则会返回 true
从而退出循环。
这两个方法的目的和原理如下:
- 执行
shutdown()
后会将线程池的状态置为关闭状态,这时将会停止接收新的任务同时会等待队列中的任务全部执行完毕后才真正关闭线程池。
awaitTermination
会阻塞直到线程池所有任务执行完毕或者超时时间已到。
为什么要两个 api
结合一起使用呢?
主要还在最终的目的是:所有线程执行完毕后再做某件事情,也就是在线程执行完毕之前其实主线程是需要被阻塞的。
shutdown()
执行后并不会阻塞,会立即返回,所有才需要后续用循环不停的调用 awaitTermination()
,因为这个 api 才会阻塞线程。
其实我们查看源码会发现,ThreadPoolExecutor
中的阻塞依然也是等待通知机制的运用,只不过用的是 LockSupport
的 API
而已。