使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果
整体的接口继承关系如下:
结合上图和源码可以厘清接口类关系:
- Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
- 然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
- 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
- 然后ThreadPoolExecutor继承了类AbstractExecutorService。
最终我们使用的就是ThreadPoolExecutor。
线程池的优势
我们为什么要使用线程池呢?包括上边提到的线程复用,线程池的优势可以总结如下:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
也就是我们可以通过维护和管理线程来降低使用成本。
线程池基本概念
线程池的真正实现类是ThreadPoolExecutor,ThreadPoolExecutor
继承了AbstractExecutorService
类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作,4个构造器示例如下:
public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 ||maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
参数列表
可以看到,其需要如下几个参数:
- corePoolSize(必需):核心线程数。默认情况下,核心线程会一直存活,但是当将
allowCoreThreadTimeout
设置为true时,核心线程也会超时回收。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中 - maximumPoolSize(必需):线程池所能容纳的最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞
- keepAliveTime(必需):表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
- unit(必需):指定keepAliveTime参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
- workQueue(必需):任务队列。通过线程池的execute()方法提交的Runnable对象(等待执行的任务)将存储在该参数中。其采用
阻塞队列
实现。ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关,关于队列和并发集合在本系列的其它Blog进行介绍 - threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。线程工厂指定创建线程的方式,需要实现ThreadFactory接口,并实现newThread(Runnable r)方法。
- handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。
了解了具体参数后我们再看看其中几个参数的可选项。
参数说明
我们可以看到,所有参数中,有几个参数是可以选择不同的实现方式的 ,我们分别探讨下:
阻塞队列workQueue选择
任务队列是基于阻塞队列实现的,即采用生产者消费者模式,在Java中需要实现BlockingQueue接口。但Java已经为我们提供了7种阻塞队列的实现:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)
- LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为Integer.MAX_VALUE
- PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。
- DelayQueue:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
- SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
- LinkedBlockingDeque: 使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样FIFO(先进先出),也可以像栈一样FILO(先进后出)。
- LinkedTransferQueue: 它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的结合体,但是把它用在ThreadPoolExecutor中,和LinkedBlockingQueue行为一致,但是是无界的阻塞队列。
注意有界队列和无界队列的区别:如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;而如果使用无界队列,因为任务队列永远都可以添加任务,所以设置maximumPoolSize没有任何意义
线程工厂threadFactory默认值
threadFactory参数可以不用指定,Executors框架已经为我们实现了一个默认的线程工厂,一般情况下使用默认即可:
/** * The default thread factory. */ private static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
拒绝策略(handler)
当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现RejectedExecutionHandler
接口,并实现rejectedExecution(Runnable r, ThreadPoolExecutor executor)
方法。不过Executors框架已经为我们实现了4种拒绝策略:
- AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
- CallerRunsPolicy:在任务被拒绝添加后,会由调用execute方法的的主线程来执行被拒绝的任务
- DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
- DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。
四种策略都可以进行选择
线程池状态
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState; //runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性 static final int RUNNING = 0;//当创建线程池后,初始时,线程池处于RUNNING状态 //如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕 static final int SHUTDOWN = 1; //如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务 static final int STOP = 2; //当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态 static final int TERMINATED = 3;
线程池执行流程
了解了参数的配置和方法,我们来看看线程如何运转。
方法调用
线程池的使用示例如下:
// 创建线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) // 向线程池提交任务 threadPool.execute(new Runnable() { @Override public void run() { ... // 线程执行的任务 } }); // 关闭线程池 threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程 // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表 threadPool.shutdownNow();
在ThreadPoolExecutor类中有几个非常重要的方法:
- execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行
- submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
shutdown()和shutdownNow()
是用来关闭线程池的。shutdown不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务, shutdownNow立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
还有很多其他的方法:比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()
等获取与线程池相关属性的方法