AbstractExecutorService
对ExecutorService
的抽象实现:它实现了接口的部分方法,但是它并没有存放任务或者线程的数组或者Collection,也就是说它依旧和线程池没有半毛钱关系。
// @since 1.5 public abstract class AbstractExecutorService implements ExecutorService { @Override public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } ... // 其它submit方法原理一样,底层执行调用的均是Executor#execute方法 }
关于invokeAll()/invokeAny()等方法的执行源码此处就不铺开了,记住结论即可:批量执行时使用特别方便(注意全部成功or任意一个成功的区别)。
手写实现AbstractExecutorService
本文以一个非常简单手写实例来告诉你任务执行器的效果,进一步让你感受到它目前还和线程池木有半毛钱关系。
private static class MyExecutorService extends AbstractExecutorService { @Override public void shutdown() { System.out.println("关闭执行器,释放资源"); } @Override public List<Runnable> shutdownNow() { System.out.println("立刻关闭执行器,释放资源"); return Collections.emptyList(); } @Override public boolean isShutdown() { return false; } @Override public boolean isTerminated() { return false; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return false; } // 执行任务(本处使用异步执行) @Override public void execute(Runnable command) { new Thread(command).start(); } }
准备一个方法,用于产生任务
// period:任务执行耗时 单位s private Runnable createTask(int period) { return () -> { try { TimeUnit.SECONDS.sleep(period); } catch (InterruptedException e) { e.printStackTrace(); } String currThreadName = Thread.currentThread().getName(); System.out.println("线程[" + currThreadName + "] 我是异步执行的,耗时" + period + "s"); }; }
书写测试方法:
@Test public void fun3() throws InterruptedException, ExecutionException { String mainThreadName = Thread.currentThread().getName(); System.out.println("----------主线程[" + mainThreadName + "]开始----------"); ExecutorService executorService = new MyExecutorService(); Instant start = Instant.now(); Future<?> submit = executorService.submit(createTask(3)); System.out.println("结果为:" + submit.get()); Instant end = Instant.now(); System.out.println("总耗时为:" + Duration.between(start, end).getSeconds()); executorService.shutdown(); }
运行程序,打印:
----------主线程[main]开始---------- 线程[Thread-0] 我是异步执行的,耗时3s 结果为:null 总耗时为:0 关闭执行器,释放资源
下面仅需改一下,把get放在上面:
System.out.println("结果为:" + submit.get()); Instant end = Instant.now(); System.out.println("总耗时为:" + Duration.between(start, end).getSeconds());
再次运行打印:
----------主线程[main]开始---------- 线程[Thread-0] 我是异步执行的,耗时3s 结果为:null 总耗时为:3 关闭执行器,释放资源
另外,关于其它submit方法,以及批量执行的invokeAll/invokeAny方法,各位可自行测试哈。下面介绍JDK自带的,Doug Lea大神给我们提供的实现类,也是最最最最最为重要的一个类:ThreadPoolExecutor。
ThreadPoolExecutor 带线程池的执行器
顾名思义,它是一个内置线程池的执行器,也就是说:它会把Runnable任务的执行均扔进线程池里面进行执行,效率最高。
注意:ThreadPoolTaskExecutor它是Spirng提供的,基于ThreadPoolExecutor进行包装实现,请勿弄混了。
本文并不会解释为何需要线程池,以及构建线程池的七大参数都是什么意思,而只会站在使用以及基础原理的角度做出示例和说明。
public class ThreadPoolExecutor extends AbstractExecutorService { // 核心线程数 private volatile int corePoolSize; // 最大线程数 private volatile int maximumPoolSize; // 任务队列(当任务太多了,就放在这里排队) private final BlockingQueue<Runnable> workQueue; // 空闲线程的超时时间,超时就回收它(非core线程) private volatile long keepAliveTime; // 不解释 private volatile ThreadFactory threadFactory; // 线程池拒绝处理函数(如任务太多了,如何拒绝) // 默认策略是:AbortPolicy。要决绝是抛出异常:RejectedExecutionException private volatile RejectedExecutionHandler handler; }
通过上里手工模拟可知道最重要的是对execute()方法的实现:它决定了你最终如何去执行任务(同步or异步?用老的线程还是用新的线程等等)。