异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解1

简介: 异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解


线程池ThreadPoolExecutor原理剖析

线程池类图结构


成员变量ctl



成员变量ctl是Integer的原子变量使用一个变量同时记录线程池状态和线程池中线程个数 [线程池状态(高3位),线程个数(低29位)],假设计算机硬件的Integer类型是32位二进制标示,如下面代码所示,其中高3位用来表示线程池状态,后面29位用来记录线程池线程个数:

//用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


线程池的主要状态


线程池的主要状态列举如下:

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;


   线程池状态含义:


   RUNNING:接收新任务并且处理阻塞队列里的任务。

   SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。

   STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时中断正在处理的任务。

   TIDYING:所有任务都执行完(包含阻塞队列里面任务),当前线程池活动线程为0,将要调用terminated方法。

   TERMINATED:终止状态。terminated方法调用完成以后的状态。


   线程池状态之间转换路径


   RUNNING→SHUTDOWN:当显式调用shutdown()方法时,或者隐式调用了finalize(),它里面调用了shutdown()方法时。


   RUNNING或者SHUTDOWN→STOP:当显式调用shutdownNow()方法时。

   SHUTDOWN→TIDYING:当线程池和任务队列都为空时。

   STOP→TIDYING:当线程池为空时。

   TIDYING→TERMINATED:当terminated()hook方法执行完成时。

   线程池同时提供了一些方法用来获取线程池的运行状态和线程池中的线程个数


代码如下:

// 获取高三位 运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取低29位 线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }



线程池的参数


另外线程池是可配置的,使用者可以根据自己的需要对线程池的参数进行调整,如类图中线程池提供了可供使用者配置的参数:


corePoolSize:线程池核心线程个数。

workQueue:用于保存等待执行的任务的阻塞队列,比如基于数组的有界Array-BlockingQueue、基于链表的无界LinkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue、优先级队列PriorityBlockingQueue等。

maximunPoolSize:线程池最大线程数量。

threadFactory:创建线程的工厂类。

defaultHandler:饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略,比如AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)、DiscardPolicy(默默丢弃,不抛出异常)。

keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间。

d415cd99d2494719b73e7b70e3788a67.png


变量mainLock是独占锁,用来控制新增Worker线程时的原子性,termination是该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程。


Worker继承AQS和Runnable接口,是具体承载任务的对象。Worker继承了AQS,实现了简单不可重入独占锁,其中state=0标示锁未被获取的状态,state=1标示锁已经被获取的状态,state=–1是创建Worker时默认的状态。创建时状态设置为–1是为了避免该线程在运行runWorker()方法前被中断,下面会具体讲解到。其中变量firstTask记录该工作线程执行的第一个任务,Thread是具体执行任务的线程。


DefaultThreadFactory是线程工厂,newThread方法是对线程的一个修饰。其中,poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。


ThreadPoolExecutor提供了一系列构造函数让我们创建线程池,比如:


ThreadPoolExecutor(int corePoolSize,//核心线程个数
                   int maximumPoolSize,//最大线程个数
                   long keepAliveTime,//非核心不活跃线程最大存活时间
                   TimeUnit unit,//keepAliveTime的单位
                   BlockingQueue<Runnable> workQueue,//阻塞队列类型
                   ThreadFactory threadFactory,//线程池创建工厂
                   RejectedExecutionHandler handler)//拒绝策略


则当我们需要创建自己的线程池时,就可以显式地新建一个该实例出来。



提交任务到线程池原理解析

ThreadPoolExecutor中提交任务到线程池的方法有下面几种



public void execute(Runnable command)

首先我们看方法public void execute(Runnable command)提交任务到线程池的逻辑:

public void execute(Runnable command) {
    //(1) 如果任务为null,则抛出NPE异常
    if (command == null)
        throw new NullPointerException();
    //(2)获取当前线程池的状态+线程个数变量的组合值
    int c = ctl.get();
    //(3)当前线程池线程个数是否小于corePoolSize,小于则开启新线程运行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //(4)如果线程池处于RUNNING状态,则添加任务到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        //(4.1)二次检查
        int recheck = ctl.get();
        //(4.2)如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //(4.3)如果当前线程池线程为空,则添加一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //(5)如果队列满了,则新增线程,新增失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command


  • 代码3是指如果当前线程池线程个数小于corePoolSize,则会在调用方法addWorker新增一个核心线程执行该任务。
  • 如果当前线程池线程个数大于等于corePoolSize则执行代码4,如果当前线程池处于RUNNING状态则添加当前任务到任务队列。这里需要判断线程池状态是因为线程池有可能已经处于非RUNNING状态,而非RUNNING状态下是抛弃新任务的。
  • 如果任务添加任务队列成功,则执行代码4.2对线程池状态进行二次校验,这是因为添加任务到任务队列后,执行代码4.2前线程池的状态有可能已经变化了,如果当前线程池状态不是RUNNING则把任务从任务队列移除,移除后执行拒绝策略;如果二次校验通过,则执行代码4.3重新判断当前线程池里面是否还有线程,如果没有则新增一个线程。
  • 如果代码4添加任务失败,则说明任务队列满了,那么执行代码5尝试调用addWorker方法新开启线程来执行该任务;如果当前线程池的线程个数大于maximumPoolSize则addWorker返回false,执行配置的拒绝策略。



public Future<?>submit(Runnable task)

下面我们来看public Future<?>submit(Runnable task)方法提交任务的逻辑:

public Future<?> submit(Runnable task) {
    // 6 NPE判断
    if (task == null) throw new NullPointerException();
    // 7 包装任务为FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    // 8 投递到线程池执行
    execute(ftask);
    // 9 返回ftask
    return ftask;
}


代码7调用newTaskFor方法对我们提交的Runnable类型任务进行包装,包装为RunnableFuture类型任务,然后提交RunnableFuture任务到线程池后返回ftask对象。

下面我们来看newTaskFor的代码逻辑:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}


代码可知,其内部创建了一个FutureTask对象,构造函数如下:

 public FutureTask(Runnable runnable, V result) {
        //将runnable适配为Callable类型任务,并且让result作为执行结果
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }


上述代码中的FutureTask会在运行时执行给定的Runnable,并将在任务Runnable执行完成后,把给定的结果value通过FutureTask的get方法返回。

public Future submit(Runnable task,T result)

下面我们看public Future submit(Runnable task,T result)方法的逻辑,其代码如下:


public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

由上述代码可知,两个参数的submit方法类似,不同在于该方法接收的是含有返回值的Callable类型的任务,最终也是转换为FutureTask后提交到线程池,并返回。



线程池中任务执行原理解析


当用户线程提交任务到线程池后,在线程池没有执行拒绝策略的情况下,用户线程会马上返回,而提交的任务要么直接切换到线程池中的Worker线程来执行,要么先放入线程池的阻塞队列里面,稍后再由Worker线程来执行。 我们就看下具体执行异步任务的Worker线程是如何工作的。首先我们看下Worker的构造函数:


Worker(Runnable firstTask) {
    setState(-1); // 在调用runWorker前禁止中断
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);//创建一个线程
}


在上述代码中,Worker构造函数内首先设置Worker的运行状态status为–1,是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态≥0则会中断该线程)。我们讲到Worker继承了AbstractQueuedSynchronizer类,实现了简单不可重入独占锁,其中status=0标示锁未被获取的状态,state=1标示锁已经被获取的状态,state=–1是创建Worker时默认的状态。然后把传递的任务firstTask保存起来,最后使用线程池中指定的线程池工厂创建一个线程作为该Worker对象的执行线程。


由于Worker本身实现了Runnable方法,所以下面我们看其run方法内是如何执行任务的:


 public void run() {
            runWorker(this);//委托给runWorker方法
        }


runWorker方法的代码如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); //(1)status设置为0,允许中断
        boolean completedAbruptly = true;
        try {
           //(2)
            while (task != null || (task = getTask()) != null) {
                 //(2.1)
                w.lock();
               ...
                try {
                    //(2.2)任务执行前干一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//(2.3)执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //(2.4)任务执行完毕后干一些事情
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //(2.5)统计当前Worker完成了多少个任务
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //(3)执行清工作
            processWorkerExit(w, completedAbruptly);
        }
    }


如上代码在运行runWorker的代码1时会调用unlock方法,该方法把status变为了0,所以这时候调用shutdownNow会中断Worker线程。


如代码2所示,如果当前task==null或者调用getTask从任务队列获取的任务返回null,则跳转到代码3执行清理工作,当前Worker也就退出执行了。如果task不为null则执行代码2.1获取工作线程内部持有的独占锁,然后执行扩展接口代码2.2,代码2.3具体执行任务,代码2.4在任务执行完毕后做一些事情,代码2.5统计当前Worker完成了多少个任务,并释放锁。


这里在执行具体任务期间加锁,是为了避免任务运行期间,其他线程调用了shutdown方法关闭线程池时中断正在执行任务的线程。

相关文章
|
1月前
|
存储 缓存 监控
什么是线程池?它的工作原理?
我是小假 期待与你的下一次相遇 ~
147 1
|
3月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
233 1
|
6月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
6月前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
5月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
248 0
|
7月前
|
Java 中间件 调度
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
本文涉及InheritableThreadLocal和TTL,从源码的角度,分别分析它们是怎么实现父子线程传递的。建议先了解ThreadLocal。
254 4
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
|
8月前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
392 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
9月前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
10月前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
234 6
|
1月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
139 6

热门文章

最新文章