1 Executor & 概述
Executor是顶级接口。关于线程池的总览示意图如下图所示:
申请线程实例时会先从核心线程corePool中获取,如果核心线程满了之后线程会先加入到工作队列中,工作队列也满了的话也允许继续申请,直至maxnumPoolSize。之后会执行拒绝策略RejectedExecutionHandler。
ThreadFactory是worker中构建线程实例的工厂。
使用线程池的好处如下:
可以复用线程、控制最大并发数。
实现任务线程队列缓存策略和拒绝机制。
实现如定时执行、周期执行等与时间相关的功能。
隔离线程环境。
比如,为交易服务和搜索服务分别开两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免线程间互相影响。
Executor executor = new ExecutorSubClass(); //线程池实现类
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
Executors 类为 Executor 提供了工厂方法。ExecutorService 是 Executor 接口的默认实现,下面是使用 ExecutorService 创建线程的几种方式。
2 Executors中对线程池的实现
2.1 CachedThreadPool
public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); for(int i = 0;i < 5;i++){ service.execute(new TestThread()); } service.shutdown(); }
CachedThreadPool 会为每个任务都创建一个线程。
ExecutorService 对象是使用静态的 Executors 创建的,这个方法可以确定Executor类型。调用 shutDown 可以防止新任务提交给 ExecutorService,这个线程在 Executor 中所有任务完成后退出。
2.2 FixedThreadPool
FixedThreadPool 可以使用有限的线程集来启动多线程。
public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(5); for(int i = 0;i < 5;i++){ service.execute(new TestThread()); } service.shutdown(); }
FixedThreadPool 可以一次性的预先执行高昂的线程分配,因此也就可以限制线程的数量。因为不必为每个任务都固定的付出创建线程的时间开销,所以可以节省时间。
2.3 SingleThreadExecutor
SingleThreadExecutor 就是线程数量为1的 FixedThreadPool,如果向SingleThreadPool一次性提交了多个任务,那么这些任务将会排队,所有的任务都将使用相同的线程。SingleThreadPool 会序列化所有提交给他的任务,并会维护一个隐藏的挂起队列。
public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i = 0;i < 5;i++){ service.execute(new TestThread()); } service.shutdown(); }
可以用 SingleThreadExecutor 来确保任意时刻都只有唯一一个任务在运行。
2.3.1 newSingleThreadExecutor() 与newFixedThreadPool(1)
结合上面的介绍,自然会想到一个问题:既然已经有了newFixedThreadPool,为什么还要存在newSingleThreadExecutor这个方法?
结合jdk中的说明“Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.”。newSingleThreadExecutor和newFixedThreadPool(1)确实是有区别的,区别在于newSingleThreadExecutor返回的线程池保证不能被重新配置(重新调整线程池大小等)
对比源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue < Runnable > ()); }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue < Runnable > ()); }
二者及其相似,连ThreadPoolExecutor对象的参数值都一样的,只不过newFixedThreadPool返回了一个ThreadPoolExecutor对象,newSingleThreadExecutor返回了一个被FinalizableDelegatedExecutorService包装过的ThreadPoolExecutor对象,问题其实就出在FinalizableDelegatedExecutorService上。
容量为1的FixedThreadPool的属性(容量等)可以通过将其强转为ThreadPoolExecutor而被重新进行配置;
SingleThreadPool实际是一个FinalizableDelegatedExecutorService类的对象,把诸如setCorePoolSize的方法给去掉了,并且该类没有继承任何可以配置线程池的类,因此可以保证它不能被再次配置。
2.4 SingleThreadScheduledExrcutor
创建一个可以周期性执行任务的单线程线程池。
public class TestMain { //格式化 static SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //AtomicInteger用来计数 static AtomicInteger number = new AtomicInteger(); public static void main(String[] args) throws Exception { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); for (int i = 0; i < 3; i++) { executorService.schedule(new Runnable() { @Override public void run() { System.out.println("第" + number.incrementAndGet() + "周期线程运行当前时间【" + sim.format(new Date()) + "】"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 3L, TimeUnit.SECONDS); } System.out.println("主线程运行当前时间【" + sim.format(new Date()) + "】"); } }
2.5 ScheduledThreadPool
ScheduledThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2); scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { loge("time:"); } }, 0, 40, TimeUnit.MILLISECONDS); //0表示首次执行任务的延迟时间,40表示每次执行任务的间隔时间,TimeUnit.MILLISECONDS执行的时间间隔数值单位
5 ThreadPoolExecutor
5.1 概述
上面说了那么多,其实都不是推荐的方法,阿里巴巴的编程手册中有这样的描述:
线程池不允许使用Executors去创建。
而是通过ThreadPoolExecutor的方式,
这样的处理方式让写的读者更加明确线程池的运行规则,规则资源耗尽的风险。
具体原因是:
fixedThreadPool 和 singleThreadExecutor 对于排队的队列没有数量限制,最大支持Integer.MAX_VALUE个;
cachedThreadPool 和 scheduledThreadPool 中最大线程数可以达到 Integer.MAX_VALUE 个;
当线程过多的时候,这些方法就容易造成OOM了
当我们去看 Executors 的源码就会发现,Executors.newFixedThreadPool 、Executors.newSingleThreadPool 、Executors.newCachedThreadPool 、Executors.newScheduledThreadPool等方法的底层都是 ThreadPoolExecutor 实现的,其中执行周期任务得益于DelayedWorkedQueue的使用,而这些线程池又不是被推荐使用的,所以有必要好好研究一下 ThreadPoolExecutor 以便自定义线程池,这样才可以更加明确的线程池的运行规则,规避资源耗尽的风险。
ThreadPoolExecutor 的核心参数值得是他在构造时需要传递的参数,其构造参数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
int corePoolSize: 线程池的常驻核心线程数
如果设置为0,则表示在没有任何任务时,销毁线程池;
如果大于0,即使没有任务时也会保证线程池的线程数量等于此值。
需要注意的是,如果此值设置的比较小,则会频繁的创建和销毁线程。如果设置的比较大,则会浪费系统资源。
int maximumPoolSize:线程池最大可以创建的线程数
官方规定此参数必须大于0,也必须大于等于 corePoolSize ,此值只有在任务比较多,且任务队列中已被存满时才会用到。
long keepAliveTime : 线程的存活时间
当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量等于corePoolSize 。
如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候不会销毁任何线程。
TimeUnit unit:存活时间的单位,是配合 keepAliveTime 参数共同使用的。
BlockingQueue<Runnable> workQueue : 线程池执行的任务队列
当线程池中的所有线程都在处理任务时,如果来了新任务就会缓存到次任务队列中排队等待执行。
ThreadFactory threadFactory:线程的创建工厂
此参数一般用的较少,如果创建线程时不指定此参数,则会使用默认的现场创建工厂的方法来创建线程。
RejectedExecutionHandler handler :指定线程池的拒绝策略。
当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执此任务时,就会用到此拒绝策略。
5.2 先从CAPACITY的初始化开始说起:
@Native public static final int SIZE = 32; private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; // 1110 0000 0000 0000 0000 0000 0000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 private static final int STOP = 1 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 private static final int TIDYING = 2 << COUNT_BITS; // 0100 0000 0000 0000 0000 0000 0000 private static final int TERMINATED = 3 << COUNT_BITS; // 0110 0000 0000 0000 0000 0000 0000
COUNT_BITS 值为29,CAPACITY的计算如下图:
为什么最后要 - 1,原因是和一下两个方法有关,获得运行状态和获得当前活动线程数:
// 获取运行状态 RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED private static int runStateOf(int c) { // - CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000 return c & ~CAPACITY; } // 取出低29位的值,表示获得当前活动的线程数 private static int workerCountOf(int c) { // CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111 return c & CAPACITY; }
前三位是0,后29位是1的序列可以非常方便的取出c的前三位和后29位,即,runState和workerCount,是存储在一个叫ctl的变量中的。
5.3 线程池运行状态和活动线程数
RUNNING状态可以转换为SHUTDOWN或STOP状态,具体如下:
SHUTDOWN和STOP相当于一个中间状态,最终所有任务都停止了时会进入TIDYING状态。
5.4 上述线程池的构造函数
5.5 execute
执行流程如下图:
简化版的流程图:
对应的带注释的源代码如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /** ctl记录着workCount和runState */ int c = ctl.get(); /** case1:如果线程池中的线程数量小于核心线程数,那么创建线程并执行 */ if (workerCountOf(c) < corePoolSize) { // workerCountof(c):获取当前活动线程数 /** * 在线程池中新建一个新的线程 * command:需要执行的Runnable线程 * true:新增线程时,【当前活动的线程数】是否 < corePoolSize * false:新增线程时,【当前活动的线程数】是否 < maximumPoolSize */ if (addWorker(command, true)) return; // 添加新线程失败,则重新获取【当前活动的线程数】 c = ctl.get(); } /** 第二步:如果当前线程池是运行状态 且 任务添加到队列成功(即,case2:如果workCount >= corePoolSize) */ if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue队列中 // 重新获取ctl int recheck = ctl.get(); // 再次check一下,当前线程池是否是运行状态,如果不是运行时状态,则把刚刚放入到workQueue队列中的command移除掉 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0){ // 如果【当前活动的线程数】为0,则执行addWorker方法 /** * null:只创建线程,但不去启动 * false:添加线程时,根据maximumPoolSize来判断 * * 如果workerCountOf(recheck) > 0,则直接返回,在队列中的command稍后会出队列并且执行 */ addWorker(null, false); } /** * 第三步:满足以下两种条件之一,进入第三步判断语句 * case1:线程池不是正在运行状态,即:isRunning(c)==false * case2:workCount>=corePoolSize 并且添加workQueue队列失败。即:workQueue.offer(command)== false * 由于第二个参数传的是false,所以如果workCount < maximumPoolSize,则创建执行线程;否则进入方法体执行reject(command) * 如果是true的话则和核心线程数进行比较 */ } else if (!addWorker(command, false)) reject(command); // 执行线程创建失败的拒绝策略 }
5.5.1 addWorker
execute中经常使用到的一个重要方法就是addWorker(),效果是在线程池中添加一个新的线程。
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 步骤1:试图将workerCount + 1 for (;;) { int c = ctl.get(); // 获得运行状态runState int rs = runStateOf(c); /** * 只有如下两种情况可以新增worker,继续执行下去: * case one:rs==RUNNING * case two:rs==SHUTDOWN && firstTask ==null&&!workQueue.isEmpty() */ if (rs >= SHUTDOWN && // 非RUNNING状态。线程池异常,表示不再去接收新的线程任务了,返回false /** * 当线程池是SHUTDOWN状态时,表示不再接收新的任务了,所以: * case1:如果firstTask!=nul1,表示要添加新任务,则:新增worker失败,返回false。 * case2:如果firstTask==null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了。则:新增worker失败,返回false */ ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; /** /** * 试图将workerCount + 1 */ for (;;) { // 获取当前线程池里的线程数 int wc = workerCountOf(c); /** * 满足如下任意情况,则新增worker失败,返回false * case1:大于等于最大线程容量,即:536870911 * case2:当core是true时;>= 核心线程数 * 当core是false时:>= 最大线程数 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 当前工作线程数加1 if (compareAndIncrementWorkerCount(c)) break retry; // 成功加1,则跳出retry标识的这两层for循环 c = ctl.get(); // Re-read ctl // 如果线程数加1操作失败,则获取当前最新的线程池运行状态,来判断与rs是否相同,如果不同,则说明方法处理期间线程池运行状态发生了变化,里新获取最新runState if (runStateOf(c) != rs) continue retry; // 跳出内层for循环,继续从第一个for执行 } } /** * 步骤二:创建Worker,加入集合workers中,并启动Worker线程 */ boolean workerStarted = false; // 用于判断新的worker实例是否已经开始执行Thread.start boolean workerAdded = false; // 用于判断新的worker实例是否已经被添加到线程池 Worker w = null; // AQS.Worker try { // 创建Worker实例,每个Worker对象都会创建一个线程 w = new Worker(firstTask); // 获取包含work的线程 final Thread t = w.thread; if (t != null) { // 重入锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 如果能获得全局mainlock锁,则执行,否则阻塞 try { // 获得线程池当前的运行状态runStatus int rs = runStateOf(ctl.get()); /** * 满足如下任意条件,即可向线程池中添加线程: * case1:线程池状态为RUNNING。 * case2:线程池状态为SHUTDOWN并且firstTask为空。 */ if (rs < SHUTDOWN || // 只有rs=RUNNING才满足 (rs == SHUTDOWN && firstTask == null)) { // 线程池关闭,传入线程任务为null if (t.isAlive()) // 因为t是新构建的线程,还没有启动,所以如果是alive状态,说明已经被启动 throw new IllegalThreadStateException(); workers.add(w); // workers中保存线程池中存在的所有work实例集合 int s = workers.size(); if (s > largestPoolSize) // largestPoolSize用于记录线程池中曾经存在的最大的线程数量 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 开启线程 workerStarted = true; } } } finally { if (! workerStarted) // 如果没有开启线程 addWorkerFailed(w); } return workerStarted; }