零、线程池宏观了解
线程池是一中池化技术,内部维护一定数量的线程,当有任务体提交的时候可以直接执行任务,避免了线程的创建和开销损耗。
一、Java构建线程的方式
- 继承Thread,也是实现了Runable
- 实现Runable
- 实现Callable,有返回值
- 线程池,java提供构成线程池的方式
Executors类可以创建线程池,阿里云不允许使用这种方式创建线程池,因为对线程池的控制力度太低了。
推荐使用构造函数手动创建线程池,也就引入了线程池的七大参数
二、线程池七大参数
核心线程数
最大线程数
最大空闲时间:多于核心线程的线程存活时间
时间单位
阻塞队列:最大线程数满了加入队列
线程工厂:给线程起个名字,方便线上排查问题
拒绝策略:
- 丢弃任务抛出异常
- 丢出任务
- 丢弃队列最前面的任务,重新提交被拒绝的任务
- 由提交任务的线程处理该任务
三、线程池执行流程
当前线程少于核心线程,创建新的线程执行任务
大于等于核心线程,提交的任务放到阻塞队列
队列满了,创建新的线程
线程个数达到最大执行拒绝策略
问题:为什么先放入队列在判断是否达到最大线程
因为如果核心线程满了就创建新的线程去执行,达到最大线程数之后再放入队列,那么新创建线程就需要获取全局锁,对性能有很大的损耗。而且如果正在被核心线程执行的任务很快的执行完,就可以直接从任务队列中取出新的任务执行。这样不需要创建新的线程也可以完成任务,那么这样还可以节省一部分内存。
四、线程池属性标识
4.1 核心属性标识
// 表达两个意思 初始化是111 00000... // 1. 声明当前线程池的状态 // 2. 声明线程池中的线程数量 // 高3位是线程池的状态 低29位是当前线程个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 32-3=29 private static final int COUNT_BITS = Integer.SIZE - 3; // 位运算得到最大容量,就是28个1 private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 线程池的状态 ctl的高3位 位运算更快 RUNNING最小 TERMINATED最大 // 111 正常接收任务 private static final int RUNNING = -1 << COUNT_BITS; // 000 不接受任务 阻塞队列的饿任务继续处理 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 不接受任务 也不处理任务 还会中断当前任务 private static final int STOP = 1 << COUNT_BITS; // 010 过渡状态 即将关闭 private static final int TIDYING = 2 << COUNT_BITS; // 011 线程池彻底关闭 要执行terminated()才会真正关闭 private static final int TERMINATED = 3 << COUNT_BITS; // 当前线程池状态 private static int runStateOf(int c) { return c & ~COUNT_MASK; } // 当前线程数量 指正在工作的线程 private static int workerCountOf(int c) { return c & COUNT_MASK; } private static int ctlOf(int rs, int wc) { return rs | wc; } // 如果希望提示位运算等操作,可以学习雪花算法,手写雪花算法之后就能精通各种位运算
4.2 线程池状态变化
五、Worker的封装
worker标识一个工作线程,worker继承AQS,实现了runable,worker实现了非重入互斥锁,强烈简历看这篇文章,因为这里实现了AQS,后文中execute使用了ReentrantLock从ReentrantLock的非公平独占锁实现来看AQS的原理
worker里面有一个线程,通过内部的工厂来创建线程,还有一个fristtask,firstTask是传入的第一个任务,如果非空,那么线程在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行workQueue中的任务,也就是非核心线程的创建。单个任务执行完毕后,worker会继续在workQueue中获取下一个任务继续执行。
来看看runWork方法吧
final void runWorker(Worker w) { // 获取一下当前的线程和任务 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // 前面拿到了可以置null w.firstTask = null; // 允许中断 w.unlock(); // allow interrupts // 标识 boolean completedAbruptly = true; try { // 有任务的时候先加个锁 没任务从阻塞队列拿了再加锁 while (task != null || (task = getTask()) != null) { // 加锁避免SHUTDOWM w.lock(); // 线程池状态在STOP 要中断线程 要是没有中断 要重写检查 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // before和after是留给调用者实现的 beforeExecute(wt, task); try { // 启动线程 task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { // 收尾工作 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
六、线程池的execute方法执行
从execute方法开始,就是线程池的执行流程
public void execute(Runnable command) { // 健壮性判断 为null抛出空指针异常 if (command == null) throw new NullPointerException(); // 拿到32位的int int c = ctl.get(); // 工作线程数量 < 核心线程数 if (workerCountOf(c) < corePoolSize) { // 添加任务到核心线程数 if (addWorker(command, true)) return; // 没用加锁 并发的时候会添加失败 重新获取核心线程数 c = ctl.get(); } // 工作状态并且将任务添加到队列 if (isRunning(c) && workQueue.offer(command)) { // 再次获取 ctl int recheck = ctl.get(); // 线程池不在RUNNING 就将任务那出来 因为其他状态不接收任务 if (! isRunning(recheck) && remove(command)) // 拒绝策略 reject(command); // 如果工作线程为0 else if (workerCountOf(recheck) == 0) // 添加一个空的工作线程,去处理上一个任务 // 不然上一个任务就一直放在队列中无法处理了 addWorker(null, false); } // 创建非核心线程处理任务 else if (!addWorker(command, false)) // 拒绝策略 reject(command); }
接下来分析addWork是如何添加一个线程的,由于没有锁操作,所以代码比较严谨,添加了许多额外的判断
JDK源码都是用for(;;)构造死循环,为什么不用while(true),是因为编译之后while(true)字节码更多,for指令更少,不占用寄存器性能更好
第一个大循环只是实现了工作数量+1 后面word才是添加线程,addWorker采用了需要条件判断,其实写的非常优雅,做了一次前置判断,减少了判断次数
private boolean addWorker(Runnable firstTask, boolean core) { // goto语法 我是万万没想到居然有goto retry: // 获取ctl for (int c = ctl.get();;) { // 满足一些特殊条件的时候构建线程失败 // 除了RUNNING都有可能 判断一下前置条件 减少判断次数 if (runStateAtLeast(c, SHUTDOWN) // 大于等于stop不处理阻塞队列 // 队列是空的不用构建线程 && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()) ) // 构建工作线程失败 return false; for (;;) { // 当前工作数量大于当前种类的最大数量 if (workerCountOf(c) // 用来限制线程数量的范围 >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // cas添加工作线程 if (compareAndIncrementWorkerCount(c)) // 退出外侧for break retry; // 并发的时候cas可能失败 重新获取 c = ctl.get(); // 判断线程池状态是否变化 变化了走外部循环 没变走内部 if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } // 上面只是将工作线程+1 这里才是真正创建word // worker开始 = false boolean workerStarted = false; // worker添加 = false boolean workerAdded = false; // 申明worker Worker w = null; try { // 实例化worker w = new Worker(firstTask); // 获取到这个线程 final Thread t = w.thread; if (t != null) { // 线程不为null 加锁 了解加锁看上面的链接 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); // 线程池状态在RUNNING if (isRunning(c) || // 状态值低于STOP firstTask为null (runStateLessThan(c, STOP) && firstTask == null)) { // 状态改变了抛出异常 if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); // 添加w workers.add(w); // true标识已经添加 workerAdded = true; // 获取大小 更新一下大小 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { // 放到finally一定会解锁 避免死锁 mainLock.unlock(); } // 添加成功了启动线程 if (workerAdded) { t.start(); // 标识启动 workerStarted = true; } } } finally { // 启动失败需要拿出这个Worker,内部依旧加锁实现回滚 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }