线程池ThreadPoolExcutor源码剖析---工作原理

简介: 线程池ThreadPoolExcutor源码剖析---工作原理

零、线程池宏观了解

线程池是一中池化技术,内部维护一定数量的线程,当有任务体提交的时候可以直接执行任务,避免了线程的创建和开销损耗。

一、Java构建线程的方式

  1. 继承Thread,也是实现了Runable
  2. 实现Runable
  3. 实现Callable,有返回值
  4. 线程池,java提供构成线程池的方式

Executors类可以创建线程池,阿里云不允许使用这种方式创建线程池,因为对线程池的控制力度太低了。

推荐使用构造函数手动创建线程池,也就引入了线程池的七大参数

二、线程池七大参数

核心线程数

最大线程数

最大空闲时间:多于核心线程的线程存活时间

时间单位

阻塞队列:最大线程数满了加入队列

线程工厂:给线程起个名字,方便线上排查问题

拒绝策略

  • 丢弃任务抛出异常
  • 丢出任务
  • 丢弃队列最前面的任务,重新提交被拒绝的任务
  • 由提交任务的线程处理该任务

三、线程池执行流程

当前线程少于核心线程,创建新的线程执行任务

大于等于核心线程,提交的任务放到阻塞队列

队列满了,创建新的线程

线程个数达到最大执行拒绝策略

问题:为什么先放入队列在判断是否达到最大线程

因为如果核心线程满了就创建新的线程去执行,达到最大线程数之后再放入队列,那么新创建线程就需要获取全局锁,对性能有很大的损耗。而且如果正在被核心线程执行的任务很快的执行完,就可以直接从任务队列中取出新的任务执行。这样不需要创建新的线程也可以完成任务,那么这样还可以节省一部分内存。

四、线程池属性标识

4.1 核心属性标识

// 表达两个意思 初始化是111 00000...
  // 1. 声明当前线程池的状态
  // 2. 声明线程池中的线程数量
  // 高3位是线程池的状态 低29位是当前线程个数
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  // 32-3=29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 位运算得到最大容量,就是28个1
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    // 线程池的状态 ctl的高3位 位运算更快 RUNNING最小 TERMINATED最大
    // 111 正常接收任务 
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 000 不接受任务 阻塞队列的饿任务继续处理
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 001 不接受任务 也不处理任务 还会中断当前任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 010 过渡状态 即将关闭
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 011 线程池彻底关闭 要执行terminated()才会真正关闭
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 当前线程池状态
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    // 当前线程数量 指正在工作的线程
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 如果希望提示位运算等操作,可以学习雪花算法,手写雪花算法之后就能精通各种位运算

4.2 线程池状态变化

五、Worker的封装

worker标识一个工作线程,worker继承AQS,实现了runable,worker实现了非重入互斥锁,强烈简历看这篇文章,因为这里实现了AQS,后文中execute使用了ReentrantLock从ReentrantLock的非公平独占锁实现来看AQS的原理

worker里面有一个线程,通过内部的工厂来创建线程,还有一个fristtask,firstTask是传入的第一个任务,如果非空,那么线程在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行workQueue中的任务,也就是非核心线程的创建。单个任务执行完毕后,worker会继续在workQueue中获取下一个任务继续执行。

来看看runWork方法吧

final void runWorker(Worker w) {
    // 获取一下当前的线程和任务
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        // 前面拿到了可以置null
        w.firstTask = null;
        // 允许中断
        w.unlock(); // allow interrupts
        // 标识
        boolean completedAbruptly = true;
        try {
          // 有任务的时候先加个锁 没任务从阻塞队列拿了再加锁
            while (task != null || (task = getTask()) != null) {
              // 加锁避免SHUTDOWM
                w.lock();
                // 线程池状态在STOP 要中断线程 要是没有中断 要重写检查
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  // before和after是留给调用者实现的
                    beforeExecute(wt, task);
                    try {
                      // 启动线程
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                  // 收尾工作 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

六、线程池的execute方法执行

从execute方法开始,就是线程池的执行流程

public void execute(Runnable command) {
    // 健壮性判断 为null抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        // 拿到32位的int
        int c = ctl.get();
        // 工作线程数量 < 核心线程数
        if (workerCountOf(c) < corePoolSize) {
          // 添加任务到核心线程数
            if (addWorker(command, true))
                return;
            // 没用加锁 并发的时候会添加失败 重新获取核心线程数 
            c = ctl.get();
        }
        // 工作状态并且将任务添加到队列
        if (isRunning(c) && workQueue.offer(command)) {
          // 再次获取 ctl
            int recheck = ctl.get();
            // 线程池不在RUNNING 就将任务那出来 因为其他状态不接收任务
            if (! isRunning(recheck) && remove(command))
              // 拒绝策略
                reject(command);
            // 如果工作线程为0
            else if (workerCountOf(recheck) == 0)
              // 添加一个空的工作线程,去处理上一个任务 
              // 不然上一个任务就一直放在队列中无法处理了
                addWorker(null, false);
        }
        // 创建非核心线程处理任务
        else if (!addWorker(command, false))
          // 拒绝策略
            reject(command);
    }

接下来分析addWork是如何添加一个线程的,由于没有锁操作,所以代码比较严谨,添加了许多额外的判断

JDK源码都是用for(;;)构造死循环,为什么不用while(true),是因为编译之后while(true)字节码更多,for指令更少,不占用寄存器性能更好

第一个大循环只是实现了工作数量+1 后面word才是添加线程,addWorker采用了需要条件判断,其实写的非常优雅,做了一次前置判断,减少了判断次数

private boolean addWorker(Runnable firstTask, boolean core) {
    // goto语法 我是万万没想到居然有goto
        retry:
        // 获取ctl
        for (int c = ctl.get();;) {
            // 满足一些特殊条件的时候构建线程失败
            // 除了RUNNING都有可能 判断一下前置条件 减少判断次数
            if (runStateAtLeast(c, SHUTDOWN)
              // 大于等于stop不处理阻塞队列
              // 队列是空的不用构建线程
                && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())
                )
                // 构建工作线程失败
                return false;
            for (;;) {
              // 当前工作数量大于当前种类的最大数量
                if (workerCountOf(c)
                  // 用来限制线程数量的范围
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // cas添加工作线程
                if (compareAndIncrementWorkerCount(c))
                  // 退出外侧for
                    break retry;
                // 并发的时候cas可能失败 重新获取
                c = ctl.get(); 
                // 判断线程池状态是否变化 变化了走外部循环 没变走内部
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
            }
        }
    // 上面只是将工作线程+1 这里才是真正创建word
    // worker开始 = false
        boolean workerStarted = false;
        // worker添加 = false
        boolean workerAdded = false;
        // 申明worker
        Worker w = null;
        try {
          // 实例化worker
            w = new Worker(firstTask);
            // 获取到这个线程
            final Thread t = w.thread;
            if (t != null) {
              // 线程不为null 加锁 了解加锁看上面的链接
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   
                    int c = ctl.get();
          // 线程池状态在RUNNING
                    if (isRunning(c) ||
                      // 状态值低于STOP firstTask为null 
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 状态改变了抛出异常
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        // 添加w
                        workers.add(w);
                        // true标识已经添加
                        workerAdded = true;
                        // 获取大小 更新一下大小
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                  // 放到finally一定会解锁 避免死锁
                    mainLock.unlock();
                }
                // 添加成功了启动线程
                if (workerAdded) {
                    t.start();
                    // 标识启动
                    workerStarted = true;
                }
            }
        } finally {
          // 启动失败需要拿出这个Worker,内部依旧加锁实现回滚
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
相关文章
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
2月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
136 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
1月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
35 1
|
1月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
33 1
|
1月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
57 0
|
1月前
|
Java 应用服务中间件 API
nginx线程池原理
nginx线程池原理
31 0
|
2月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
3月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
2月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
30 0