ThreadPoolExecutor解析

简介: 本文深入解析Java线程池核心类ThreadPoolExecutor的实现原理,涵盖工作队列、线程工厂、拒绝策略等关键组件,剖析任务提交与执行流程,揭示Worker工作机制及线程复用原理,帮助理解线程池如何高效管理并发任务。

上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。
下面是几个比较关键的类成员:

// 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
private final BlockingQueue workQueue;

//任务的执行值集合,来消费workQueue里面的任务
private final HashSet workers = new HashSet();

//线程工厂
private volatile ThreadFactory threadFactory;

//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
private volatile RejectedExecutionHandler handler;

1、CallerRunsPolicy:在调用者线程里面运行该任务
2、DiscardPolicy:丢弃任务
3、DiscardOldestPolicy:丢弃workQueue的头部任务

//最下保活work数量
private volatile int corePoolSize;

//work上限
private volatile int maximumPoolSize;

我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。
step 1:
Future<?> submit(Runnable task);

step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

step 3:<Executor>
void execute(Runnable command);

step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交我们的额任务到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize作为边界
        reject(command); //还不行?拒绝提交的任务
}

step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core) 


step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包装任务
final Thread t = w.thread; //获取线程(包含任务)
workers.add(w);   // 任务被放到works中
t.start(); //执行任务

上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。
thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:
我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:
编译前 编译后 while (1); mov eax,1 test eax,eax je foo+23h jmp foo+18h
编译前 编译后 for (;;); jmp foo+23h   

对比之下,for (;;)指令少,不占用寄存器,而且没有判断跳转,比while (1)好。
也就是说两者在在宏观上完全一样的逻辑,但是底层完全不一样,for相对于来说更加简洁明了
其实核心也就一句:
我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。
直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:
|---corePoolSize[创建]---||---workQueue[等待keepAliveTime]---||---maximumPoolSize[创建]---||---拒绝策略---|
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

  • When a new task is submitted in method {@link #execute(Runnable)},
    • and fewer than corePoolSize threads are running, a new thread is
    • created to handle the request, even if other worker threads are
    • idle. If there are more than corePoolSize but less than
    • maximumPoolSize threads running, a new thread will be created only
    • if the queue is full. By setting corePoolSize and maximumPoolSize
    • the same, you create a fixed-size thread pool. By setting
    • maximumPoolSize to an essentially unbounded value such as {@code
    • Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
    • number of concurrent tasks. Most typically, core and maximum pool
    • sizes are set only upon construction, but they may also be changed
    • dynamically using {@link #setCorePoolSize} and {@link
    • setMaximumPoolSize}.

在方法{@link #execute(Runnable)}中提交新任务时,
如果运行的线程小于corePoolSize,则创建新线程处理请求,即使其他工作线程闲置。
如果运行的线程大于corePoolSize,但是小于maximumPoolSize,当线程运行时,如果队列已满则会创建一个新线程
同样通过设置corePoolSize和maximumPoolSize,创建一个固定大小的线程池。通过设置maximumPoolSize到一个
本质上无界的值,比如{@code Integer.MAX_VALUE},您允许池容纳任意的并发任务的数量。
最典型的是核心池和最大池尺寸只在构造时设置,但也可以更改动态使用{@link #setCorePoolSize}和{@link #setMaximumPoolSize}。
在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:
Plain Text
复制代码
1
2
3

  • If the pool currently has more than corePoolSize threads,
    • excess threads will be terminated if they have been idle for more
    • than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
相关文章
|
存储 边缘计算 人工智能
云计算的发展趋势及其在行业中的应用
云计算作为当今信息技术领域的关键驱动力,正在不断演进和创新。从多云模式到边缘计算,从商业到医疗,云计算在多个行业中展现了巨大的应用潜力。未来,随着技术的不断发展,云计算将继续引领着数字化转型的浪潮,为各个行业带来更多的创新和机遇。
1972 0
云计算的发展趋势及其在行业中的应用
|
存储 缓存 NoSQL
Redis 服务器全方位介绍:从入门到核心原理
Redis是一款高性能内存键值数据库,支持字符串、哈希、列表等多种数据结构,广泛用于缓存、会话存储、排行榜及消息队列。其单线程事件循环架构保障高并发与低延迟,结合RDB和AOF持久化机制兼顾性能与数据安全。通过主从复制、哨兵及集群模式实现高可用与横向扩展,适用于现代应用的多样化场景。合理配置与优化可显著提升系统性能与稳定性。
829 0
|
7月前
|
移动开发 Shell
用于连续波雷达的二进制频移键控——论文阅读
本文提出一种基于二进制频移键控(BFSK)的连续波雷达新波形,利用勒让德序列实现理想周期自相关与完美互相关特性。该波形具恒定包络、频谱紧凑、旁瓣抑制优异等优势,支持匹配与失配滤波灵活切换,适用于多址雷达网络,但对频率源精度要求较高。
337 1
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
812 0
|
敏捷开发 供应链 数据可视化
如何利用精益生产管理工具提升项目执行力?推荐7款必备工具
本文介绍了七款精益生产管理工具,包括板栗看板、LeanKit、Targetprocess、Miro、Smartsheet、Airtable 和 LiquidPlanner,详细阐述了各工具的功能亮点及其在不同行业的应用,旨在帮助企业提高效率、减少浪费、优化流程,实现项目管理的持续改进。
如何利用精益生产管理工具提升项目执行力?推荐7款必备工具
|
IDE 开发工具 数据安全/隐私保护
如何对PDF的加密和破解?
PDF文档的加密与暴力破解加密文档
631 0
|
机器学习/深度学习 人工智能 自然语言处理
梦入丹青境,变换由心生
**阿里通义的“丹青-千变万换”是图像处理技术,让用户轻松替换图片内容,如人脸、衣物和背景。该技术基于深度学习,能精确分离图像元素,实现自然的图像修改。用户通过简单步骤即可实现创意变换:选择图片、标记保留对象、输入生成参数,然后运行。此工具适用于广告、个性化媒体内容创建,帮助设计师高效工作,促进个性化营销。[Learn More](https://modelscope.cn/studios/iic/ReplaceAnything)**
|
Linux API C语言
lua 如何在嵌入式Linux中与c语言结合
lua 如何在嵌入式Linux中与c语言结合
484 1
|
存储 PyTorch 算法框架/工具
【chat-gpt问答记录】关于pytorch中的线性层nn.Linear()
【chat-gpt问答记录】关于pytorch中的线性层nn.Linear()
669 0
|
Linux 数据安全/隐私保护
进程间通信之共享内存及其shm函数的使用【Linux】
进程间通信之共享内存及其shm函数的使用【Linux】
947 2