AbstractExecutorService 抽象类
AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 中的部分方法,它相当一个干将,会分析大管家有哪些要做的工作,然后针对大管家的要求做一些具体的规划,然后找他的得力助手 ThreadPoolExecutor
来完成目标。
AbstractExecutorService 这个抽象类主要实现了 invokeAll
和 invokeAny
方法,关于这两个方法的源码分析我们会在后面进行解释。
ScheduledExecutorService 接口
ScheduledExecutorService 也是一个接口,它扩展了 ExecutorService 接口,提供了 ExecutorService 接口所没有的功能,ScheduledExecutorService 顾名思义就是一个定时执行器
,定时执行器可以安排命令在一定延迟时间后运行或者定期执行。
它主要有三个接口方法,一个重载方法。下面我们先来看一下这两个重载方法。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
schedule
方法能够延迟一定时间后执行任务,并且只能执行一次。可以看到,schedule 方法也返回了一个 ScheduledFuture
对象,ScheduledFuture 对象扩展了 Future 和 Delayed 接口,它表示异步延迟计算的结果。schedule 方法支持零延迟和负延迟,这两类值都被视为立即执行任务。
还有一点需要说明的是,schedule 方法能够接收相对的时间和周期作为参数,而不是固定的日期,你可以使用 date.getTime - System.currentTimeMillis() 来得到相对的时间间隔。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
scheduleAtFixedRate 表示任务会根据固定的速率在时间 initialDelay
后不断地执行。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
这个方法和上面的方法很类似,它表示的是以固定延迟时间的方式来执行任务。
scheduleAtFixedRate 和 scheduleWithFixedDelay 这两个方法容易混淆,下面我们通过一个示例来说明一下这两个方法的区别。
public class ScheduleTest { public static void main(String[] args) { Runnable command = () -> { long startTime = System.currentTimeMillis(); System.out.println("current timestamp = " + startTime); try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("time spend = " + (System.currentTimeMillis() - startTime)); }; ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS); } }
输出结果大致如下
可以看到,每次打印出来 current timestamp 的时间间隔大约等于 1000 毫秒,所以可以断定 scheduleAtFixedRate
是以恒定的速率来执行任务的。
然后我们再看一下 scheduleWithFixedDelay
方法,和上面测试类一样,只不过我们把 scheduleAtFixedRate 换为了 scheduleWithFixedDelay 。
scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);
然后观察一下输出结果
可以看到,两个 current timestamp 之间的间隔大约等于 1000(固定时间) + delay(time spend) 的总和,由此可以确定 scheduleWithFixedDelay
是以固定时延来执行的。
线程池的描述
下面我们先来认识一下什么是线程池,线程池从概念上来看就是一个池子
,什么池子呢?是指管理同一组工作线程的池子,也就是说,线程池会统一管理内部的工作线程。
wiki 上说,线程池其实就是一种软件设计模式,这种设计模式用于实现计算机程序中的并发。
比如下面就是一个简单的线程池概念图。
注意:这个图只是一个概念模型,不是真正的线程池实现,希望读者不要混淆。
可以看到,这种其实也相当于是生产者-消费者模型,任务队列中的线程会进入到线程池中,由线程池进行管理,线程池中的一个个线程就是工作线程,工作线程执行完毕后会放入完成队列中,代表已经完成的任务。
上图有个缺点,那就是队列中的线程执行完毕后就会销毁,销毁就会产生性能损耗,降低响应速度,而我们使用线程池的目的往往是需要把线程重用起来,提高程序性能。
所以我们应该把执行完成后的工作线程重新利用起来,等待下一次使用。
线程池创建
我们上面大概聊了一下什么线程池的基本执行机制,你知道了线程是如何复用的,那么任何事物不可能是凭空出现的,线程也一样,那么它是如何创建出来的呢?下面就不得不提一个工具类,那就是 Executors
。
Executors 也是java.util.concurrent
包下的成员,它是一个创建线程池的工厂,可以使用静态工厂方法来创建线程池,下面就是 Executors 所能够创建线程池的具体类型。
newFixedThreadPool
:newFixedThreadPool 将会创建固定数量的线程池,这个数量可以由程序员通过创建Executors.newFixedThreadPool(int nThreads)
时手动指定,每次提交一个任务就会创建一个线程,在任何时候,nThreads 的值是最多允许活动的线程。如果在所有线程都处于活跃状态时有额外的任务被创建,这些新创建的线程会进入等待队列等待线程调度。如果有任何线程由于执行期间出现意外导致线程终止
,那么在执行后续任务时会使用等待队列中的线程进行替代。newWorkStealingPool
:newWorkStealingPool 是 JDK1.8 新增加的线程池,它是基于fork-join
机制的一种线程池实现,使用了Work-Stealing
算法。newWorkStealingPool 会创建足够的线程来支持并行度,会使用多个队列来减少竞争。work-stealing pool 线程池不会保证提交任务的执行顺序。newSingleThreadExecutor
:newSingleThreadExecutor 是一个单线程的执行器,它只会创建单个
线程来执行任务,如果这个线程异常结束,则会创建另外一个线程来替代。newSingleThreadExecutor 会确保任务在任务队列中的执行次序,也就是说,任务的执行是有序的
。newCachedThreadPool
:newCachedThreadPool 会根据实际需要创建一个可缓存的线程池。如果线程池的线程数量超过实际需要处理的任务,那么 newCachedThreadPool 将会回收多余的线程。如果实际需要处理的线程不能满足任务的数量,则回你添加新的线程到线程池中,线程池中线程的数量不存在任何限制。newSingleThreadScheduledExecutor
:newSingleThreadScheduledExecutor 和 newSingleThreadExecutor 很类似,只不过带有 scheduled 的这个执行器哥们能够在一定延迟后执行或者定期执行任务。newScheduledThreadPool
:这个线程池和上面的 scheduled 执行器类似,只不过 newSingleThreadScheduledExecutor 比 newScheduledThreadPool 多加了一个DelegatedScheduledExecutorService
代理,这其实包装器设计模式的体现。
上面这些线程池的底层实现都是由 ThreadPoolExecutor 来提供支持的,所以要理解这些线程池的工作原理,你就需要先把 ThreadPoolExecutor 搞明白,下面我们就来聊一聊 ThreadPoolExecutor。
ThreadPoolExecutor 类
ThreadPoolExecutor
位于 java.util.concurrent
工具类下,可以说它是线程池中最核心的一个类了。如果你要想把线程池理解透彻的话,就要首先了解一下这个类。
如果我们再拿上面家族举例子的话,ThreadPoolExecutor 就是一个家族的骨干人才,家族顶梁柱。ThreadPoolExecutor 做的工作真是太多太多了。
首先,ThreadPoolExecutor 提供了四个构造方法,然而前三个构造方法最终都会调用最后一个构造方法进行初始化
public class ThreadPoolExecutor extends AbstractExecutorService { ..... // 1 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); // 2 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); // 3 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); // 4 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
所以我们直接就来看一波最后这个线程池,看看参数都有啥,如果我没数错的话,应该是有 7 个参数(小学数学水平。。。。。。)
- 首先,一个非常重要的参数就是
corePoolSize
,核心线程池的容量/大小,你叫啥我觉得都没毛病。只不过你得理解这个参数的意义,它和线程池的实现原理有非常密切的关系。你刚开始创建了一个线程池,此时是没有任何线程的,这个很好理解,因为我现在没有任务可以执行啊,创建线程干啥啊?而且创建线程还有开销啊,所以等到任务过来时再创建线程也不晚。但是!我要说但是了,如果调用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就会在没有任务到来时创建线程,前者是创建 corePoolSize 个线程,后者是只创建一个线程。Lea 爷爷本来想让我们程序员当个懒汉
,等任务来了再干;可是你非要当个饿汉
,提前完成任务。如果我们想当个懒汉的话,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列
当中。
maximumPoolSize
:又来一个线程池的容量,只不过这个是线程池的最大容量,也就是线程池所能容纳最大的线程,而上面的 corePoolSize 只是核心线程容量。
我知道你此时会有疑问,那就是不知道如何核心线程的容量和线程最大容量的区别是吧?我们后面会解释这点。
keepAliveTime
:这个参数是线程池的保活机制
,表示线程在没有任务执行的情况下保持多久会终止。在默认情况下,这个参数只在线程数量大于 corePoolSize 时才会生效。当线程数量大于 corePoolSize 时,如果任意一个空闲的线程的等待时间 > keepAliveTime 后,那么这个线程会被剔除,直到线程数量等于 corePoolSize 为止。如果调用了 allowCoreThreadTimeOut 方法,线程数量在 corePoolSize 范围内也会生效,直到线程减为 0。unit
:这个参数好说,它就是一个TimeUnit
的变量,unit 表示的是 keepAliveTime 的时间单位。unit 的类型有下面这几种
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
workQueue
:这个参数表示的概念就是等待队列,我们上面说过,如果核心线程 > corePoolSize 的话,就会把任务放入等待队列,这个等待队列的选择也是一门学问。Lea 爷爷给我们展示了三种等待队列的选择
SynchronousQueue
: 基于阻塞队列(BlockingQueue)
的实现,它会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。使用 SynchronousQueue 阻塞队列一般要求maximumPoolSizes 为无界,也就是 Integer.MAX_VALUE,避免线程拒绝执行操作。LinkedBlockingQueue
:LinkedBlockingQueue 是一个无界缓存等待队列。当前执行的线程数量达到 corePoolSize 的数量时,剩余的元素会在阻塞队列里等待。ArrayBlockingQueue
:ArrayBlockingQueue 是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于 corePoolSize 时,多余的元素缓存在 ArrayBlockingQueue 队列中等待有空闲的线程时继续执行,当 ArrayBlockingQueue 已满时,加入 ArrayBlockingQueue 失败,会开启新的线程去执行,当线程数已经达到最大的 maximumPoolSizes 时,再有新的元素尝试加入 ArrayBlockingQueue时会报错
threadFactory
:线程工厂,这个参数主要用来创建线程;handler
:拒绝策略,拒绝策略主要有以下取值
AbortPolicy
:丢弃任务并抛出 RejectedExecutionException 异常。DiscardPolicy
: 直接丢弃任务,但是不抛出异常。DiscardOldestPolicy
:直接丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。CallerRunsPolicy
:由调用线程处理该任务。