前言
本文进行JavaSE基础内容:Executor执行器体系的整体介绍。该文是整体框架介绍,并非局限于某一个使用的细节。由于我不止一次的被咨询说ExecutorService和ScheduledExecutorService什么区别和联系,以及ThreadPoolExecutor和ThreadPoolTaskExecutor有什么不一样之类的问题,因此决定写此文科普一下。
本文内容不深,但比较全,因此我相信绝对是你值得阅读的一篇文章(即使你可能已经工作了5年+)。
说明:流行框架如Spring、MyBatis、Netflix等一般均对Executor体系有所扩展,本文只讲述JDK的内容,其它的举一反三即可。
正文
java.util.concurrent.Executor它属于J.U.C体系里的重要一员,是在JDK 1.5之后新增的内容,由大名鼎鼎的Doug Lea大神操刀。因为它的出现才使得我们大量的使用多线程开发程序成为了可能。
Executor执行器体系整体的框架可描述为如图所示(只例举出重要的API,覆盖绝大部分场景):
Executor 执行器
执行器,可执行任意一个Runnable任务。该接口提供了一种将任务提交与如何运行每个任务的机制(包括线程使用、调度等细节)分离的方法。因此任务它自己并不需要关心线程的创建、调度细节。
画外音:可能直接执行,也可能把你交给线程池执行,总之我帮你执行就欧克了
public interface Executor { // @since 1.5 void execute(Runnable command); }
需要注意的是:该执行器并不规定是同步执行还是异步执行你提交上来的任务,下面分别举例。
画外音:你之前一直以为
Executor
一定是异步,那是错的。应该说:绝大部分情况下我们会去异步执行,且绝大部分情况下均会使用线程池。但绝大多数并不代表所有
同步执行任务
@Test public void fun1() { String mainThreadName = Thread.currentThread().getName(); System.out.println("----------主线程[" + mainThreadName + "]开始----------"); // 自己定义一个同步执行器 Executor syncExecutor = (Runnable command) -> command.run(); // 提交任务 syncExecutor.execute(() -> { String currThreadName = Thread.currentThread().getName(); System.out.println("线程[" + currThreadName + "] 我是同步执行的..."); }); }
运行程序,控制台打印:
----------主线程[main]开始---------- 线程[main] 我是同步执行的...
特点:自己直接显示调用Runnable#run那便是简单的方法调用而已,属于同步
异步执行任务
@Test public void fun2() throws InterruptedException { String mainThreadName = Thread.currentThread().getName(); System.out.println("----------主线程[" + mainThreadName + "]开始----------"); // 自己定义一个异步执行器 Executor asyncExecutor = (Runnable command) -> new Thread(command).start(); // 提交任务 asyncExecutor.execute(() -> { String currThreadName = Thread.currentThread().getName(); System.out.println("线程[" + currThreadName + "] 我是异步执行的..."); }); TimeUnit.SECONDS.sleep(1); }
运行程序,控制台打印:
----------主线程[main]开始---------- 线程[Thread-0] 我是异步执行的...
特点:new了一个Thread去执行command任务,调度交由系统去掌控,属于异步。
通过如上两个示例可以看到Executor它并不代表线程池(线程池的英文是:ThreadPool好麽~),仅是任务执行器而已。抽象出这么一个接口是想屏蔽掉内部执行、调度的细节,方面使用者的使用而已。
ExecutorService
Executor过于抽象,仅代表着任务的执行(甚至同步、异步都没规定),因此直接通过实现该顶层接口的类并没有,平时使用得最多还是这个子接口ExecutorService:提供更多的服务。
// @since 1.5 public interface ExecutorService extends Executor { // 关闭。执行已经提交的任务,但不接受新任务。 // 此方法用于关闭不需要使用的执行器,内部会做资源回收的操作,如回收线程池 void shutdown(); // 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表 // 此方法一般不使用 List<Runnable> shutdownNow(); // 执行器是否已经被关闭 boolean isShutdown(); // 只有当shutdown()或者shutdownNow()被调用,而且所有任务都执行完成后才会返回true boolean isTerminated(); // 阻塞。直到所有的任务都被执行完,或者超时 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // =====下面为任务提交方法(使用Future跟踪任务执行情况)===== // 以下三个是最最最最最最最常用的方法:执行任务 // Callable任务有返回值。所以Future.get()的时候可以拿到此返回值 <T> Future<T> submit(Callable<T> task); // Runnable任务。执行完后Future.get()永远返回的是result这个值 <T> Future<T> submit(Runnable task, T result); // 执行完后Future.get()永远返回的是null Future<?> submit(Runnable task); // 这几个方法在**批量执行**或**多选一**的业务场景中非常方便。 // invokeAll:所有任务都完成(包括成功/被中断/超时)后才会返回,所以它会阻塞哦(时间由最长的决定) // invokeAny()在任意一个任务成功(被中断/超时)后就会返回,只需要成功一个就返回(时间由最短的决定) <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
其实提交的Runnable任务最终都会通过Executors.callable(runnable, result)适配为一个Callable<V>去执行的,具体源码处有兴趣的可以看一看。
从类图中可以看出,ExecutorService的实现分为两个分支:左边的AbstractExecutorService(一般为基于线程池的实现),以及右边的ScheduledExecutorService延迟/周期执行服务,下面也得分别作出解释。
画外音:submit/invokeAll等方法如何去执行任务,调用execute()放入线程池or周期性执行,或者结合在一起?这是它的两大方向