101. 熟悉 Java 并发吗,谈谈对 JUC 线程池 ThreadPoolExecutor 的认识吧(一)

简介: 101. 熟悉 Java 并发吗,谈谈对 JUC 线程池 ThreadPoolExecutor 的认识吧(一)

101. 熟悉 Java 并发吗,谈谈对 JUC 线程池 ThreadPoolExecutor 的认识吧(一)


前提

很早之前就打算看一次JUC线程池ThreadPoolExecutor的源码实现,由于近段时间比较忙,一直没有时间整理出源码分析的文章。之前在分析扩展线程池实现可回调的Future时候曾经提到并发大师Doug Lea在设计线程池ThreadPoolExecutor的提交任务的顶层接口Executor只有一个无状态的执行方法:

public interface Executor {
    void execute(Runnable command);
}

而ExecutorService提供了很多扩展方法底层基本上是基于Executor#execute()方法进行扩展。本文着重分析ThreadPoolExecutor#execute()的实现,笔者会从实现原理、源码实现等角度结合简化例子进行详细的分析。ThreadPoolExecutor的源码从JDK8到JDK11基本没有变化,本文编写的时候使用的是JDK11。

ThreadPoolExecutor的原理

ThreadPoolExecutor里面使用到JUC同步器框架AbstractQueuedSynchronizer(俗称AQS)、大量的位操作、CAS操作。ThreadPoolExecutor提供了固定活跃线程(核心线程)、额外的线程(线程池容量 - 核心线程数这部分额外创建的线程,下面称为非核心线程)、任务队列以及拒绝策略这几个重要的功能。

JUC同步器框架

ThreadPoolExecutor里面使用到JUC同步器框架,主要用于四个方面:

全局锁mainLock成员属性,是可重入锁ReentrantLock类型,主要是用于访问工作线程Worker集合和进行数据统计记录时候的加锁操作。

条件变量termination,Condition类型,主要用于线程进行等待终结awaitTermination()方法时的带期限阻塞。

任务队列workQueue,BlockingQueue类型,任务队列,用于存放待执行的任务。

工作线程,内部类Worker类型,是线程池中真正的工作线程对象。

关于AQS笔者之前写过一篇相关源码分析的文章:JUC同步器框架AbstractQueuedSynchronizer源码图文分析。

核心线程

这里先参考ThreadPoolExecutor的实现并且进行简化,实现一个只有核心线程的线程池,要求如下:

暂时不考虑任务执行异常情况下的处理。

任务队列为无界队列。

线程池容量固定为核心线程数量。

暂时不考虑拒绝策略。

public class CoreThreadPool implements Executor {
    private BlockingQueue<Runnable> workQueue;
    private static final AtomicInteger COUNTER = new AtomicInteger();
    private int coreSize;
    private int threadCount = 0;
    public CoreThreadPool(int coreSize) {
        this.coreSize = coreSize;
        this.workQueue = new LinkedBlockingQueue<>();
    }
    @Override
    public void execute(Runnable command) {
        if (++threadCount <= coreSize) {
            new Worker(command).start();
        } else {
            try {
                workQueue.put(command);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
    }
    private class Worker extends Thread {
        private Runnable firstTask;
        public Worker(Runnable runnable) {
            super(String.format("Worker-%d", COUNTER.getAndIncrement()));
            this.firstTask = runnable;
        }
        @Override
        public void run() {
            Runnable task = this.firstTask;
            while (null != task || null != (task = getTask())) {
                try {
                    task.run();
                } finally {
                    task = null;
                }
            }
        }
    }
    private Runnable getTask() {
        try {
            return workQueue.take();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
    public static void main(String[] args) throws Exception {
        CoreThreadPool pool = new CoreThreadPool(5);
        IntStream.range(0, 10)
                .forEach(i -> pool.execute(() ->
                        System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i))));
        Thread.sleep(Integer.MAX_VALUE);
    }
}

某次运行结果如下:

Thread:Worker-0,value:0
Thread:Worker-3,value:3
Thread:Worker-2,value:2
Thread:Worker-1,value:1
Thread:Worker-4,value:4
Thread:Worker-1,value:5
Thread:Worker-2,value:8
Thread:Worker-4,value:7
Thread:Worker-0,value:6
Thread:Worker-3,value:9

设计此线程池的时候,核心线程是懒创建的,如果线程空闲的时候则阻塞在任务队列的take()方法,其实对于ThreadPoolExecutor也是类似这样实现,只是如果使用了keepAliveTime并且允许核心线程超时(allowCoreThreadTimeOut设置为true)则会使用BlockingQueue#poll(keepAliveTime)进行轮询代替永久阻塞。

其他附加功能

构建ThreadPoolExecutor实例的时候,需要定义maximumPoolSize(线程池最大线程数)和corePoolSize(核心线程数)。当任务队列是有界的阻塞队列,核心线程满负载,任务队列已经满的情况下,会尝试创建额外的maximumPoolSize - corePoolSize个线程去执行新提交的任务。当ThreadPoolExecutor这里实现的两个主要附加功能是:

一定条件下会创建非核心线程去执行任务,非核心线程的回收周期(线程生命周期终结时刻)是keepAliveTime,线程生命周期终结的条件是:下一次通过任务队列获取任务的时候并且存活时间超过keepAliveTime。

提供拒绝策略,也就是在核心线程满负载、任务队列已满、非核心线程满负载的条件下会触发拒绝策略。

源码分析

先分析线程池的关键属性,接着分析其状态控制,最后重点分析ThreadPoolExecutor#execute()方法。

关键属性

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 控制变量-存放状态和线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 任务队列,必须是阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    // 全局锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // awaitTermination方法使用的等待条件变量
    private final Condition termination = mainLock.newCondition();
    // 记录峰值线程数
    private int largestPoolSize;
    // 记录已经成功执行完毕的任务数
    private long completedTaskCount;
    // 线程工厂,用于创建新的线程实例
    private volatile ThreadFactory threadFactory;
    // 拒绝执行处理器,对应不同的拒绝策略
    private volatile RejectedExecutionHandler handler;
    // 空闲线程等待任务的时间周期,单位是纳秒
    private volatile long keepAliveTime;
    // 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
    private volatile boolean allowCoreThreadTimeOut;
    // 核心线程数
    private volatile int corePoolSize;
    // 线程池容量
    private volatile int maximumPoolSize;
    // 省略其他代码
}

下面看参数列表最长的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以自定义核心线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。下面简单分析一下每个参数的含义和作用:

corePoolSize:int类型,核心线程数量。

maximumPoolSize:int类型,最大线程数量,也就是线程池的容量。

keepAliveTime:long类型,线程空闲等待时间,也和工作线程的生命周期有关,下文会分析。

unit:TimeUnit类型,keepAliveTime参数的时间单位,实际上keepAliveTime最终会转化为纳秒。

workQueue:BlockingQueue类型,等待队列或者叫任务队列。

threadFactory:ThreadFactory类型,线程工厂,用于创建工作线程(包括核心线程和非核心线程),默认使用Executors.defaultThreadFactory()作为内建线程工厂实例,一般自定义线程工厂才能更好地跟踪工作线程。

handler:RejectedExecutionHandler 类型,线程池的拒绝执行处理器,更多时候称为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包括核心线程和非核心线程)并且继续提交任务。提供了4种内建的拒绝策略实现:

AbortPolicy:直接拒绝策略,也就是不会执行任务,直接抛出RejectedExecutionException,这是默认的拒绝策略。

DiscardPolicy:抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现)。

DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列队头的任务抛弃,然后执行当前提交的任务。

CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable#run(),一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。

状态控制

状态控制主要围绕原子整型成员变量ctl:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// 通过ctl值获取运行状态
private static int runStateOf(int c) { 
    return c & ~COUNT_MASK; 
}
// 通过ctl值获取工作线程数
private static int workerCountOf(int c) {
    return c & COUNT_MASK; 
}
// 通过运行状态和工作线程数计算ctl的值,或运算
private static int ctlOf(int rs, int wc) { 
    return rs | wc; 
}
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
// CAS操作线程数增加1
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
// CAS操作线程数减少1
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
// 线程数直接减少1
private void decrementWorkerCount() {
    ctl.addAndGet(-1);
}

接下来分析一下线程池的状态变量,工作线程上限数量位的长度是COUNT_BITS,它的值是Integer.SIZE - 3,也就是正整数29:

我们知道,整型包装类型Integer实例的大小是4 byte,一共32 bit,也就是一共有32个位用于存放0或者1。在ThreadPoolExecutor实现中,使用32位的整型包装类型存放工作线程数和线程池状态。其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有23种。工作线程上限数量为229 - 1,超过5亿,这个数量在短时间内不用考虑会超限。

接着看工作线程上限数量掩码COUNT_MASK,它的值是(1 < COUNT_BITS) - l,也就是1左移29位,再减去1,如果补全32位,它的位视图如下:

然后就是线程池的状态常量,这里只详细分析其中一个,其他类同,这里看RUNNING状态:

// -1的补码为:111-11111111111111111111111111111
// 左移29位后:111-00000000000000000000000000000
// 10进制值为:-536870912
// 高3位111的值就是表示线程池正在处于运行状态
private static final int RUNNING = -1 << COUNT_BITS;

控制变量ctl的组成就是通过线程池运行状态rs和工作线程数wc通过或运算得到的:

// rs=RUNNING值为:111-00000000000000000000000000000
// wc的值为0:000-00000000000000000000000000000
// rs | wc的结果为:111-00000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

那么我们怎么从ctl中取出高3位的线程池状态?上面源码中提供的runStateOf()方法就是提取运行状态:

// 先把COUNT_MASK取反(~COUNT_MASK),
得到:111-00000000000000000000000000000
// ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// 两者做一次与运算即可得到高3位xxx
private static int runStateOf(int c){
    return c & ~COUNT_MASK;
}

同理,取出低29位的工作线程数量只需要把ctl和COUNT_MASK(000-11111111111111111111111111111)做一次与运算即可。

工作线程数为0的前提下,小结一下线程池的运行状态常量:

image.png这里有一个比较特殊的技巧,由于运行状态值存放在高3位,所以可以直接通过十进制值(甚至可以忽略低29位,直接用ctl进行比较,或者使用ctl和线程池状态常量进行比较)来比较和判断线程池的状态:

工作线程数为0的前提下:RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)

下面这三个方法就是使用这种技巧:


// ctl和状态常量比较,判断是否小于
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
// ctl和状态常量比较,判断是否小于或等于
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
// ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

最后是线程池状态的跃迁图:

]

PS:线程池源码中有很多中间变量用了简单的单字母表示,例如c就是表示ctl、wc就是表示worker count、rs就是表示running status。

目录
相关文章
|
2月前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
15天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
72 17
|
2月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
231 64
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
1月前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
126 38
|
2月前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####