线程池ThreadPoolExcutor源码剖析---工作原理

简介: 线程池ThreadPoolExcutor源码剖析---工作原理

零、线程池宏观了解

线程池是一中池化技术,内部维护一定数量的线程,当有任务体提交的时候可以直接执行任务,避免了线程的创建和开销损耗。

一、Java构建线程的方式

  1. 继承Thread,也是实现了Runable
  2. 实现Runable
  3. 实现Callable,有返回值
  4. 线程池,java提供构成线程池的方式

Executors类可以创建线程池,阿里云不允许使用这种方式创建线程池,因为对线程池的控制力度太低了。

推荐使用构造函数手动创建线程池,也就引入了线程池的七大参数

二、线程池七大参数

核心线程数

最大线程数

最大空闲时间:多于核心线程的线程存活时间

时间单位

阻塞队列:最大线程数满了加入队列

线程工厂:给线程起个名字,方便线上排查问题

拒绝策略

  • 丢弃任务抛出异常
  • 丢出任务
  • 丢弃队列最前面的任务,重新提交被拒绝的任务
  • 由提交任务的线程处理该任务

三、线程池执行流程

当前线程少于核心线程,创建新的线程执行任务

大于等于核心线程,提交的任务放到阻塞队列

队列满了,创建新的线程

线程个数达到最大执行拒绝策略

问题:为什么先放入队列在判断是否达到最大线程

因为如果核心线程满了就创建新的线程去执行,达到最大线程数之后再放入队列,那么新创建线程就需要获取全局锁,对性能有很大的损耗。而且如果正在被核心线程执行的任务很快的执行完,就可以直接从任务队列中取出新的任务执行。这样不需要创建新的线程也可以完成任务,那么这样还可以节省一部分内存。

四、线程池属性标识

4.1 核心属性标识

// 表达两个意思 初始化是111 00000...
  // 1. 声明当前线程池的状态
  // 2. 声明线程池中的线程数量
  // 高3位是线程池的状态 低29位是当前线程个数
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  // 32-3=29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 位运算得到最大容量,就是28个1
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    // 线程池的状态 ctl的高3位 位运算更快 RUNNING最小 TERMINATED最大
    // 111 正常接收任务 
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 000 不接受任务 阻塞队列的饿任务继续处理
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 001 不接受任务 也不处理任务 还会中断当前任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 010 过渡状态 即将关闭
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 011 线程池彻底关闭 要执行terminated()才会真正关闭
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 当前线程池状态
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    // 当前线程数量 指正在工作的线程
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 如果希望提示位运算等操作,可以学习雪花算法,手写雪花算法之后就能精通各种位运算

4.2 线程池状态变化

五、Worker的封装

worker标识一个工作线程,worker继承AQS,实现了runable,worker实现了非重入互斥锁,强烈简历看这篇文章,因为这里实现了AQS,后文中execute使用了ReentrantLock从ReentrantLock的非公平独占锁实现来看AQS的原理

worker里面有一个线程,通过内部的工厂来创建线程,还有一个fristtask,firstTask是传入的第一个任务,如果非空,那么线程在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行workQueue中的任务,也就是非核心线程的创建。单个任务执行完毕后,worker会继续在workQueue中获取下一个任务继续执行。

来看看runWork方法吧

final void runWorker(Worker w) {
    // 获取一下当前的线程和任务
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        // 前面拿到了可以置null
        w.firstTask = null;
        // 允许中断
        w.unlock(); // allow interrupts
        // 标识
        boolean completedAbruptly = true;
        try {
          // 有任务的时候先加个锁 没任务从阻塞队列拿了再加锁
            while (task != null || (task = getTask()) != null) {
              // 加锁避免SHUTDOWM
                w.lock();
                // 线程池状态在STOP 要中断线程 要是没有中断 要重写检查
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  // before和after是留给调用者实现的
                    beforeExecute(wt, task);
                    try {
                      // 启动线程
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                  // 收尾工作 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

六、线程池的execute方法执行

从execute方法开始,就是线程池的执行流程

public void execute(Runnable command) {
    // 健壮性判断 为null抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        // 拿到32位的int
        int c = ctl.get();
        // 工作线程数量 < 核心线程数
        if (workerCountOf(c) < corePoolSize) {
          // 添加任务到核心线程数
            if (addWorker(command, true))
                return;
            // 没用加锁 并发的时候会添加失败 重新获取核心线程数 
            c = ctl.get();
        }
        // 工作状态并且将任务添加到队列
        if (isRunning(c) && workQueue.offer(command)) {
          // 再次获取 ctl
            int recheck = ctl.get();
            // 线程池不在RUNNING 就将任务那出来 因为其他状态不接收任务
            if (! isRunning(recheck) && remove(command))
              // 拒绝策略
                reject(command);
            // 如果工作线程为0
            else if (workerCountOf(recheck) == 0)
              // 添加一个空的工作线程,去处理上一个任务 
              // 不然上一个任务就一直放在队列中无法处理了
                addWorker(null, false);
        }
        // 创建非核心线程处理任务
        else if (!addWorker(command, false))
          // 拒绝策略
            reject(command);
    }

接下来分析addWork是如何添加一个线程的,由于没有锁操作,所以代码比较严谨,添加了许多额外的判断

JDK源码都是用for(;;)构造死循环,为什么不用while(true),是因为编译之后while(true)字节码更多,for指令更少,不占用寄存器性能更好

第一个大循环只是实现了工作数量+1 后面word才是添加线程,addWorker采用了需要条件判断,其实写的非常优雅,做了一次前置判断,减少了判断次数

private boolean addWorker(Runnable firstTask, boolean core) {
    // goto语法 我是万万没想到居然有goto
        retry:
        // 获取ctl
        for (int c = ctl.get();;) {
            // 满足一些特殊条件的时候构建线程失败
            // 除了RUNNING都有可能 判断一下前置条件 减少判断次数
            if (runStateAtLeast(c, SHUTDOWN)
              // 大于等于stop不处理阻塞队列
              // 队列是空的不用构建线程
                && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())
                )
                // 构建工作线程失败
                return false;
            for (;;) {
              // 当前工作数量大于当前种类的最大数量
                if (workerCountOf(c)
                  // 用来限制线程数量的范围
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // cas添加工作线程
                if (compareAndIncrementWorkerCount(c))
                  // 退出外侧for
                    break retry;
                // 并发的时候cas可能失败 重新获取
                c = ctl.get(); 
                // 判断线程池状态是否变化 变化了走外部循环 没变走内部
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
            }
        }
    // 上面只是将工作线程+1 这里才是真正创建word
    // worker开始 = false
        boolean workerStarted = false;
        // worker添加 = false
        boolean workerAdded = false;
        // 申明worker
        Worker w = null;
        try {
          // 实例化worker
            w = new Worker(firstTask);
            // 获取到这个线程
            final Thread t = w.thread;
            if (t != null) {
              // 线程不为null 加锁 了解加锁看上面的链接
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   
                    int c = ctl.get();
          // 线程池状态在RUNNING
                    if (isRunning(c) ||
                      // 状态值低于STOP firstTask为null 
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 状态改变了抛出异常
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        // 添加w
                        workers.add(w);
                        // true标识已经添加
                        workerAdded = true;
                        // 获取大小 更新一下大小
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                  // 放到finally一定会解锁 避免死锁
                    mainLock.unlock();
                }
                // 添加成功了启动线程
                if (workerAdded) {
                    t.start();
                    // 标识启动
                    workerStarted = true;
                }
            }
        } finally {
          // 启动失败需要拿出这个Worker,内部依旧加锁实现回滚
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
相关文章
|
1月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
1月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
14天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
4天前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
1月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
1月前
|
存储 Java
线程池的底层工作原理是什么?
【8月更文挑战第8天】线程池的底层工作原理是什么?
86 8
|
25天前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
12 0
|
2月前
|
存储 SQL Java
(七)全面剖析Java并发编程之线程变量副本ThreadLocal原理分析
在之前的文章:彻底理解Java并发编程之Synchronized关键字实现原理剖析中我们曾初次谈到线程安全问题引发的"三要素":多线程、共享资源/临界资源、非原子性操作,简而言之:在同一时刻,多条线程同时对临界资源进行非原子性操作则有可能产生线程安全问题。
|
2月前
|
监控 Java 开发者
深入理解Java并发编程:线程池的原理与实践
【5月更文挑战第85天】 在现代Java应用开发中,高效地处理并发任务是提升性能和响应能力的关键。线程池作为一种管理线程的机制,其合理使用能够显著减少资源消耗并优化系统吞吐量。本文将详细探讨线程池的核心原理,包括其内部工作机制、优势以及如何在Java中正确实现和使用线程池。通过理论分析和实例演示,我们将揭示线程池对提升Java应用性能的重要性,并给出实践中的最佳策略。
|
1月前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践