101. 熟悉 Java 并发吗,谈谈对 JUC 线程池 ThreadPoolExecutor 的认识吧(一)
前提
很早之前就打算看一次JUC线程池ThreadPoolExecutor的源码实现,由于近段时间比较忙,一直没有时间整理出源码分析的文章。之前在分析扩展线程池实现可回调的Future时候曾经提到并发大师Doug Lea在设计线程池ThreadPoolExecutor的提交任务的顶层接口Executor只有一个无状态的执行方法:
public interface Executor { void execute(Runnable command); }
而ExecutorService提供了很多扩展方法底层基本上是基于Executor#execute()方法进行扩展。本文着重分析ThreadPoolExecutor#execute()的实现,笔者会从实现原理、源码实现等角度结合简化例子进行详细的分析。ThreadPoolExecutor的源码从JDK8到JDK11基本没有变化,本文编写的时候使用的是JDK11。
ThreadPoolExecutor的原理
ThreadPoolExecutor里面使用到JUC同步器框架AbstractQueuedSynchronizer(俗称AQS)、大量的位操作、CAS操作。ThreadPoolExecutor提供了固定活跃线程(核心线程)、额外的线程(线程池容量 - 核心线程数这部分额外创建的线程,下面称为非核心线程)、任务队列以及拒绝策略这几个重要的功能。
JUC同步器框架
ThreadPoolExecutor里面使用到JUC同步器框架,主要用于四个方面:
全局锁mainLock成员属性,是可重入锁ReentrantLock类型,主要是用于访问工作线程Worker集合和进行数据统计记录时候的加锁操作。
条件变量termination,Condition类型,主要用于线程进行等待终结awaitTermination()方法时的带期限阻塞。
任务队列workQueue,BlockingQueue类型,任务队列,用于存放待执行的任务。
工作线程,内部类Worker类型,是线程池中真正的工作线程对象。
关于AQS笔者之前写过一篇相关源码分析的文章:JUC同步器框架AbstractQueuedSynchronizer源码图文分析。
核心线程
这里先参考ThreadPoolExecutor的实现并且进行简化,实现一个只有核心线程的线程池,要求如下:
暂时不考虑任务执行异常情况下的处理。
任务队列为无界队列。
线程池容量固定为核心线程数量。
暂时不考虑拒绝策略。
public class CoreThreadPool implements Executor { private BlockingQueue<Runnable> workQueue; private static final AtomicInteger COUNTER = new AtomicInteger(); private int coreSize; private int threadCount = 0; public CoreThreadPool(int coreSize) { this.coreSize = coreSize; this.workQueue = new LinkedBlockingQueue<>(); } @Override public void execute(Runnable command) { if (++threadCount <= coreSize) { new Worker(command).start(); } else { try { workQueue.put(command); } catch (InterruptedException e) { throw new IllegalStateException(e); } } } private class Worker extends Thread { private Runnable firstTask; public Worker(Runnable runnable) { super(String.format("Worker-%d", COUNTER.getAndIncrement())); this.firstTask = runnable; } @Override public void run() { Runnable task = this.firstTask; while (null != task || null != (task = getTask())) { try { task.run(); } finally { task = null; } } } } private Runnable getTask() { try { return workQueue.take(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } public static void main(String[] args) throws Exception { CoreThreadPool pool = new CoreThreadPool(5); IntStream.range(0, 10) .forEach(i -> pool.execute(() -> System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i)))); Thread.sleep(Integer.MAX_VALUE); } }
某次运行结果如下:
Thread:Worker-0,value:0 Thread:Worker-3,value:3 Thread:Worker-2,value:2 Thread:Worker-1,value:1 Thread:Worker-4,value:4 Thread:Worker-1,value:5 Thread:Worker-2,value:8 Thread:Worker-4,value:7 Thread:Worker-0,value:6 Thread:Worker-3,value:9
设计此线程池的时候,核心线程是懒创建的,如果线程空闲的时候则阻塞在任务队列的take()方法,其实对于ThreadPoolExecutor也是类似这样实现,只是如果使用了keepAliveTime并且允许核心线程超时(allowCoreThreadTimeOut设置为true)则会使用BlockingQueue#poll(keepAliveTime)进行轮询代替永久阻塞。
其他附加功能
构建ThreadPoolExecutor实例的时候,需要定义maximumPoolSize(线程池最大线程数)和corePoolSize(核心线程数)。当任务队列是有界的阻塞队列,核心线程满负载,任务队列已经满的情况下,会尝试创建额外的maximumPoolSize - corePoolSize个线程去执行新提交的任务。当ThreadPoolExecutor这里实现的两个主要附加功能是:
一定条件下会创建非核心线程去执行任务,非核心线程的回收周期(线程生命周期终结时刻)是keepAliveTime,线程生命周期终结的条件是:下一次通过任务队列获取任务的时候并且存活时间超过keepAliveTime。
提供拒绝策略,也就是在核心线程满负载、任务队列已满、非核心线程满负载的条件下会触发拒绝策略。
源码分析
先分析线程池的关键属性,接着分析其状态控制,最后重点分析ThreadPoolExecutor#execute()方法。
关键属性
public class ThreadPoolExecutor extends AbstractExecutorService { // 控制变量-存放状态和线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 任务队列,必须是阻塞队列 private final BlockingQueue<Runnable> workQueue; // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合 private final HashSet<Worker> workers = new HashSet<>(); // 全局锁 private final ReentrantLock mainLock = new ReentrantLock(); // awaitTermination方法使用的等待条件变量 private final Condition termination = mainLock.newCondition(); // 记录峰值线程数 private int largestPoolSize; // 记录已经成功执行完毕的任务数 private long completedTaskCount; // 线程工厂,用于创建新的线程实例 private volatile ThreadFactory threadFactory; // 拒绝执行处理器,对应不同的拒绝策略 private volatile RejectedExecutionHandler handler; // 空闲线程等待任务的时间周期,单位是纳秒 private volatile long keepAliveTime; // 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效 private volatile boolean allowCoreThreadTimeOut; // 核心线程数 private volatile int corePoolSize; // 线程池容量 private volatile int maximumPoolSize; // 省略其他代码 }
下面看参数列表最长的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
可以自定义核心线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。下面简单分析一下每个参数的含义和作用:
corePoolSize:int类型,核心线程数量。
maximumPoolSize:int类型,最大线程数量,也就是线程池的容量。
keepAliveTime:long类型,线程空闲等待时间,也和工作线程的生命周期有关,下文会分析。
unit:TimeUnit类型,keepAliveTime参数的时间单位,实际上keepAliveTime最终会转化为纳秒。
workQueue:BlockingQueue类型,等待队列或者叫任务队列。
threadFactory:ThreadFactory类型,线程工厂,用于创建工作线程(包括核心线程和非核心线程),默认使用Executors.defaultThreadFactory()作为内建线程工厂实例,一般自定义线程工厂才能更好地跟踪工作线程。
handler:RejectedExecutionHandler 类型,线程池的拒绝执行处理器,更多时候称为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包括核心线程和非核心线程)并且继续提交任务。提供了4种内建的拒绝策略实现:
AbortPolicy:直接拒绝策略,也就是不会执行任务,直接抛出RejectedExecutionException,这是默认的拒绝策略。
DiscardPolicy:抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现)。
DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列队头的任务抛弃,然后执行当前提交的任务。
CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable#run(),一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。
状态控制
状态控制主要围绕原子整型成员变量ctl:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 通过ctl值获取运行状态 private static int runStateOf(int c) { return c & ~COUNT_MASK; } // 通过ctl值获取工作线程数 private static int workerCountOf(int c) { return c & COUNT_MASK; } // 通过运行状态和工作线程数计算ctl的值,或运算 private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } // CAS操作线程数增加1 private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } // CAS操作线程数减少1 private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } // 线程数直接减少1 private void decrementWorkerCount() { ctl.addAndGet(-1); }
接下来分析一下线程池的状态变量,工作线程上限数量位的长度是COUNT_BITS,它的值是Integer.SIZE - 3,也就是正整数29:
我们知道,整型包装类型Integer实例的大小是4 byte,一共32 bit,也就是一共有32个位用于存放0或者1。在ThreadPoolExecutor实现中,使用32位的整型包装类型存放工作线程数和线程池状态。其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有23种。工作线程上限数量为229 - 1,超过5亿,这个数量在短时间内不用考虑会超限。
接着看工作线程上限数量掩码COUNT_MASK,它的值是(1 < COUNT_BITS) - l,也就是1左移29位,再减去1,如果补全32位,它的位视图如下:
然后就是线程池的状态常量,这里只详细分析其中一个,其他类同,这里看RUNNING
状态:
// -1的补码为:111-11111111111111111111111111111 // 左移29位后:111-00000000000000000000000000000 // 10进制值为:-536870912 // 高3位111的值就是表示线程池正在处于运行状态 private static final int RUNNING = -1 << COUNT_BITS;
控制变量ctl的组成就是通过线程池运行状态rs和工作线程数wc通过或运算得到的:
// rs=RUNNING值为:111-00000000000000000000000000000 // wc的值为0:000-00000000000000000000000000000 // rs | wc的结果为:111-00000000000000000000000000000 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) { return rs | wc; }
那么我们怎么从ctl中取出高3位的线程池状态?上面源码中提供的runStateOf()方法就是提取运行状态:
// 先把COUNT_MASK取反(~COUNT_MASK), 得到:111-00000000000000000000000000000 // ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy // 两者做一次与运算即可得到高3位xxx private static int runStateOf(int c){ return c & ~COUNT_MASK; }
同理,取出低29位的工作线程数量只需要把ctl和COUNT_MASK(000-11111111111111111111111111111)做一次与运算即可。
工作线程数为0的前提下,小结一下线程池的运行状态常量:
这里有一个比较特殊的技巧,由于运行状态值存放在高3位,所以可以直接通过十进制值(甚至可以忽略低29位,直接用ctl进行比较,或者使用ctl和线程池状态常量进行比较)来比较和判断线程池的状态:
工作线程数为0的前提下:RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)
下面这三个方法就是使用这种技巧:
// ctl和状态常量比较,判断是否小于 private static boolean runStateLessThan(int c, int s) { return c < s; } // ctl和状态常量比较,判断是否小于或等于 private static boolean runStateAtLeast(int c, int s) { return c >= s; } // ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态 private static boolean isRunning(int c) { return c < SHUTDOWN; }
最后是线程池状态的跃迁图:
]
PS:线程池源码中有很多中间变量用了简单的单字母表示,例如c就是表示ctl、wc就是表示worker count、rs就是表示running status。