面试官:你分析过线程池源码吗?(下)

简介: 面试官:你分析过线程池源码吗?(下)
线程池状态的转换模型:


image.png


构造器


public ThreadPoolExecutor(int corePoolSize,//线程池初始启动时线程的数量
                          int maximumPoolSize,//最大线程数量
                          long keepAliveTime,//空闲线程多久关闭?
                          TimeUnit unit,// 计时单位
                          BlockingQueue<Runnable> workQueue,//放任务的阻塞队列
                          ThreadFactory threadFactory,//线程工厂
                          RejectedExecutionHandler handler// 拒绝策略) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}


在向线程池提交任务时,会通过两个方法:execute和submit。


本文着重讲解execute方法。submit方法放在下次和Future、Callable一起分析。


execute方法:


public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // clt记录着runState和workerCount
    int c = ctl.get();
    //workerCountOf方法取出低29位的值,表示当前活动的线程数
    //然后拿线程数和 核心线程数做比较
    if (workerCountOf(c) < corePoolSize) {
        // 如果活动线程数<核心线程数
        // 添加到
        //addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
        if (addWorker(command, true))
            // 如果成功则返回
            return;
        // 如果失败则重新获取 runState和 workerCount
        c = ctl.get();
    }
    // 如果当前线程池是运行状态并且任务添加到队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新获取 runState和 workerCount
        int recheck = ctl.get();
        // 如果不是运行状态并且 
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //第一个参数为null,表示在线程池中创建一个线程,但不去启动
            // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
            addWorker(null, false);
    }
    //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize
    else if (!addWorker(command, false))
        //如果失败则拒绝该任务
        reject(command);
}


总结一下它的工作流程:


  1. workerCount &lt; corePoolSize,创建线程执行任务。


  1. workerCount &gt;= corePoolSize&&阻塞队列workQueue未满,把新的任务放入阻塞队列。


  1. workQueue已满,并且workerCount &gt;= corePoolSize,并且workerCount &lt; maximumPoolSize,创建线程执行任务。


  1. 当workQueue已满,workerCount &gt;= maximumPoolSize,采取拒绝策略,默认拒绝策略是直接抛异常。


image.png


通过上面的execute方法可以看到,最主要的逻辑还是在addWorker方法中实现的,那我们就看下这个方法:


addWorker方法


主要工作是在线程池中创建一个新的线程并执行


参数定义:


  • firstTask the task the new thread should run first (or null if none). (指定新增线程执行的第一个任务或者不执行任务)


  • core if true use corePoolSize as bound, else maximumPoolSize.(core如果为true则使用corePoolSize绑定,否则为maximumPoolSize。 (此处使用布尔指示符而不是值,以确保在检查其他状态后读取新值)。)


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        //  获取运行状态
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 如果状态值 >= SHUTDOWN (不接新任务&不处理队列任务)
        // 并且 如果 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // 返回false
            return false;
        for (;;) {
            //获取线程数wc
            int wc = workerCountOf(c);
            // 如果wc大与容量 || core如果为true表示根据corePoolSize来比较,否则为maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加workerCount(原子操作)
            if (compareAndIncrementWorkerCount(c))
                // 如果增加成功,则跳出
                break retry;
            // wc增加失败,则再次获取runState
            c = ctl.get();  // Re-read ctl
            // 如果当前的运行状态不等于rs,说明状态已被改变,返回重新执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根据firstTask来创建Worker对象
        w = new Worker(firstTask);
        // 根据worker创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            // new一个锁
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 获取runState
                int rs = runStateOf(ctl.get());
                // 如果rs小于SHUTDOWN(处于运行)或者(rs=SHUTDOWN && firstTask == null)
                // firstTask == null证明只新建线程而不执行任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 如果t活着就抛异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 否则加入worker(HashSet)
                    //workers包含池中的所有工作线程。仅在持有mainLock时访问。
                    workers.add(w);
                    // 获取工作线程数量
                    int s = workers.size();
                    //largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        // 如果 s比它还要大,则将s赋值给它
                        largestPoolSize = s;
                    // worker的添加工作状态改为true    
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果worker的添加工作完成
            if (workerAdded) {
                // 启动线程
                t.start();
                // 修改线程启动状态
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回线启动状态
    return workerStarted;

为什么需要持有mainLock?


因为workers是HashSet类型的,不能保证线程安全。


w = new Worker(firstTask);如何理解呢


Worker.java


private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable


可以看到它继承了AQS并发框架还实现了Runnable。证明它还是一个线程任务类。那我们调用t.start()事实上就是调用了该类重写的run方法。


Worker为什么不使用ReentrantLock来实现呢?


tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。


线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。


public void run() {
    runWorker(this);
}


run方法又调用了runWorker方法:


final void runWorker(Worker w) {
    // 拿到当前线程
    Thread wt = Thread.currentThread();
    // 拿到当前任务
    Runnable task = w.firstTask;
    // 将Worker.firstTask置空 并且释放锁
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果task或者getTask不为空,则一直循环
        while (task != null || (task = getTask()) != null) {
            // 加锁
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //  return ctl.get() >= stop 
            // 如果线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断
            // 其实就是保证两点:
            // 1. 线程池没有停止
            // 2. 保证线程没有中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 中断当前线程
                wt.interrupt();
            try {
                // 空方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行run方法(Runable对象)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 执行完后, 将task置空, 完成任务++, 释放锁
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 退出工作
        processWorkerExit(w, completedAbruptly);
    }


总结一下runWorker方法的执行过程:


  1. while循环中,不断地通过getTask()方法从workerQueue中获取任务


  1. 如果线程池正在停止,则中断线程。否则调用3.


  1. 调用task.run()执行任务;


  1. 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);


这个流程图非常经典:



image.png


除此之外,ThreadPoolExector还提供了tryAcquiretryReleaseshutdownshutdownNowtryTerminate、等涉及的一系列线程状态更改的方法有兴趣可以自己研究。大体思路是一样的,这里不做介绍。


Worker为什么不使用ReentrantLock来实现呢?


tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。


线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。


在runWorker方法中,为什么要在执行任务的时候对每个工作线程都加锁呢?


shutdown方法与getTask方法存在竞态条件.(这里不做深入,建议自己深入研究,对它比较熟悉的面试官一般会问)


高频考点


  1. 创建线程池的五个方法。
  2. 线程池的五个状态
  3. execute执行过程。
  4. runWorker执行过程。(把两个流程图记下,理解后说个大该就行。)
  5. 比较深入的问题就是我在文中插入的问题。
  6. …期望大家能在评论区补充。


声明:图片来源于网络,侵删。


相关文章
|
2月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
271 1
|
3月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
7月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
6月前
|
存储 安全 Java
Java 集合面试题从数据结构到 HashMap 源码剖析详解及长尾考点梳理
本文深入解析Java集合框架,涵盖基础概念、常见集合类型及HashMap的底层数据结构与源码实现。从Collection、Map到Iterator接口,逐一剖析其特性与应用场景。重点解读HashMap在JDK1.7与1.8中的数据结构演变,包括数组+链表+红黑树优化,以及put方法和扩容机制的实现细节。结合订单管理与用户权限管理等实际案例,展示集合框架的应用价值,助你全面掌握相关知识,轻松应对面试与开发需求。
327 3
|
7月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
7月前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
8月前
|
Java 中间件 调度
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
本文涉及InheritableThreadLocal和TTL,从源码的角度,分别分析它们是怎么实现父子线程传递的。建议先了解ThreadLocal。
299 4
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
|
11月前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
11月前
|
安全 Java 程序员
面试必看:如何设计一个可以优雅停止的线程?
嘿,大家好!我是小米。今天分享一篇关于“如何停止一个正在运行的线程”的面试干货。通过一次Java面试经历,我明白了停止线程不仅仅是技术问题,更是设计问题。Thread.stop()已被弃用,推荐使用Thread.interrupt()、标志位或ExecutorService来优雅地停止线程,避免资源泄漏和数据不一致。希望这篇文章能帮助你更好地理解Java多线程机制,面试顺利! 我是小米,喜欢分享技术的29岁程序员。欢迎关注我的微信公众号“软件求生”,获取更多技术干货!
291 53
|
11月前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
850 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析

热门文章

最新文章