高并发之——ThreadPoolExecutor类居然是这样保证线程池正确运行的...

简介: 对于线程池的核心类ThreadPoolExecutor来说,有哪些重要的属性和内部类为线程池的正确运行提供重要的保障呢?

问题

对于线程池的核心类ThreadPoolExecutor来说,有哪些重要的属性和内部类为线程池的正确运行提供重要的保障呢?

ThreadPoolExecutor类中的重要属性

在ThreadPoolExecutor类中,存在几个非常重要的属性和方法,接下来,我们就介绍下这些重要的属性和方法。

ctl相关的属性

AtomicInteger类型的常量ctl是贯穿线程池整个生命周期的重要属性,它是一个原子类对象,主要用来保存线程的数量和线程池的状态,我们看下与这个属性相关的代码如下所示。

//主要用来保存线程数量和线程池的状态,高3位保存线程状态,低29位保存线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程池中线程的数量的位数(32-3)
private static final int COUNT_BITS = Integer.SIZE - 3;
//表示线程池中的最大线程数量
//将数字1的二进制值向右移29位,再减去1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//线程池的运行状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
//获取线程状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

对于线程池的各状态说明如下所示。

  • RUNNING:运行状态,能接收新提交的任务,并且也能处理阻塞队列中的任务
  • SHUTDOWN: 关闭状态,不能再接收新提交的任务,但是可以处理阻塞队列中已经保存的任务,当线程池处于RUNNING状态时,调用shutdown()方法会使线程池进入该状态
  • STOP: 不能接收新任务,也不能处理阻塞队列中已经保存的任务,会中断正在处理任务的线程,如果线程池处于RUNNING或SHUTDOWN状态,调用shutdownNow()方法,会使线程池进入该状态
  • TIDYING: 如果所有的任务都已经终止,有效线程数为0(阻塞队列为空,线程池中的工作线程数量为0),线程池就会进入该状态。
  • TERMINATED: 处于TIDYING状态的线程池调用terminated ()方法,会使用线程池进入该状态

也可以按照ThreadPoolExecutor类的注释,将线程池的各状态之间的转化总结成如下图所示。

微信图片_20211118115640.jpg

  • RUNNING -> SHUTDOWN:显式调用shutdown()方法, 或者隐式调用了finalize()方法
  • (RUNNING or SHUTDOWN) -> STOP:显式调用shutdownNow()方法
  • SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候
  • STOP -> TIDYING:当线程池为空的时候
  • TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候

其他重要属性

除了ctl相关的属性外,ThreadPoolExecutor类中其他一些重要的属性如下所示。

//用于存放任务的阻塞队列  
private final BlockingQueue<Runnable> workQueue;
//可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
//存放线程池中线程的集合,访问这个集合时,必须获得mainLock锁
private final HashSet<Worker> workers = new HashSet<Worker>();
//在锁内部阻塞等待条件完成
private final Condition termination = mainLock.newCondition();
//线程工厂,以此来创建新线程
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
//默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

ThreadPoolExecutor类中的重要内部类

在ThreadPoolExecutor类中存在对于线程池的执行至关重要的内部类,Worker内部类和拒绝策略内部类。接下来,我们分别看这些内部类。

Worker内部类

Worker类从源代码上来看,实现了Runnable接口,说明其本质上是一个用来执行任务的线程,接下来,我们看下Worker类的源代码,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    private static final long serialVersionUID = 6138294804551838833L;
    //真正执行任务的线程
    final Thread thread;
    //第一个Runnable任务,如果在创建线程时指定了需要执行的第一个任务
    //则第一个任务会存放在此变量中,此变量也可以为null
    //如果为null,则线程启动后,通过getTask方法到BlockingQueue队列中获取任务
    Runnable firstTask;
    //用于存放此线程完全的任务数,注意:使用了volatile关键字
    volatile long completedTasks;
    //Worker类唯一的构造放大,传递的firstTask可以为null
    Worker(Runnable firstTask) {
        //防止在调用runWorker之前被中断
        setState(-1);
        this.firstTask = firstTask;
        //使用ThreadFactory 来创建一个新的执行任务的线程
        this.thread = getThreadFactory().newThread(this);
    }
    //调用外部ThreadPoolExecutor类的runWorker方法执行任务
    public void run() {
        runWorker(this);
    }
    //是否获取到锁 
    //state=0表示锁未被获取
    //state=1表示锁被获取
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

在Worker类的构造方法中,可以看出,首先将同步状态state设置为-1,设置为-1是为了防止runWorker方法运行之前被中断。这是因为如果其他线程调用线程池的shutdownNow()方法时,如果Worker类中的state状态的值大于0,则会中断线程,如果state状态的值为-1,则不会中断线程。

Worker类实现了Runnable接口,需要重写run方法,而Worker的run方法本质上调用的是ThreadPoolExecutor类的runWorker方法,在runWorker方法中,会首先调用unlock方法,该方法会将state置为0,所以这个时候调用shutDownNow方法就会中断当前线程,而这个时候已经进入了runWork方法,就不会在还没有执行runWorker方法的时候就中断线程。

注意:大家需要重点理解Worker类的实现。

拒绝策略内部类

在线程池中,如果workQueue阻塞队列满了,并且没有空闲的线程池,此时,继续提交任务,需要采取一种策略来处理这个任务。而线程池总共提供了四种策略,如下所示。

  • 直接抛出异常,这也是默认的策略。实现类为AbortPolicy。
  • 用调用者所在的线程来执行任务。实现类为CallerRunsPolicy。
  • 丢弃队列中最靠前的任务并执行当前任务。实现类为DiscardOldestPolicy。
  • 直接丢弃当前任务。实现类为DiscardPolicy。

在ThreadPoolExecutor类中提供了4个内部类来默认实现对应的策略,如下所示。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

我们也可以通过实现RejectedExecutionHandler接口,并重写RejectedExecutionHandler接口的rejectedExecution方法来自定义拒绝策略,在创建线程池时,调用ThreadPoolExecutor的构造方法,传入我们自己写的拒绝策略。

例如,自定义的拒绝策略如下所示。

public class CustomPolicy implements RejectedExecutionHandler {
    public CustomPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            System.out.println("使用调用者所在的线程来执行任务")
            r.run();
        }
    }
}

使用自定义拒绝策略创建线程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
               new CustomPolicy());
相关文章
|
2月前
|
存储 安全 Java
【Java集合类面试二十五】、有哪些线程安全的List?
线程安全的List包括Vector、Collections.SynchronizedList和CopyOnWriteArrayList,其中CopyOnWriteArrayList通过复制底层数组实现写操作,提供了最优的线程安全性能。
|
1月前
|
Java Spring
运行@Async注解的方法的线程池
自定义@Async注解线程池
67 3
|
2月前
|
存储 缓存 监控
函数计算产品使用问题之调用sd生图时,怎么保证高并发场景正常运行
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
2月前
|
安全 Java 调度
|
2月前
|
安全 Java 程序员
线程安全与 Vector 类的分析
【8月更文挑战第22天】
25 4
|
2月前
|
存储 缓存 NoSQL
Redis内存管理揭秘:掌握淘汰策略,让你的数据库在高并发下也能游刃有余,守护业务稳定运行!
【8月更文挑战第22天】Redis的内存淘汰策略管理内存使用,防止溢出。主要包括:noeviction(拒绝新写入)、LRU/LFU(淘汰最少使用/最不常用数据)、RANDOM(随机淘汰)及TTL(淘汰接近过期数据)。策略选择需依据应用场景、数据特性和性能需求。可通过Redis命令行工具或配置文件进行设置。
52 2
|
2月前
|
消息中间件 设计模式 安全
多线程魔法:揭秘一个JVM中如何同时运行多个消费者
【8月更文挑战第22天】在Java虚拟机(JVM)中探索多消费者模式,此模式解耦生产与消费过程,提升系统性能。通过`ExecutorService`和`BlockingQueue`构建含2个生产者及4个消费者的系统,实现实时消息处理。多消费者模式虽增强处理能力,但也引入线程安全与资源竞争等挑战,需谨慎设计以确保高效稳定运行。
72 2
|
2月前
|
安全 Java API
Java多线程编程:使用Atomic类实现原子操作
在Java多线程环境中,共享资源的并发访问可能导致数据不一致。传统的同步机制如`synchronized`关键字或显式锁虽能保障数据一致性,但在高并发场景下可能导致线程阻塞和性能下降。为此,Java提供了`java.util.concurrent.atomic`包下的原子类,利用底层硬件的原子操作确保变量更新的原子性,实现无锁线程安全。
16 0
【多线程面试题 二】、 说说Thread类的常用方法
Thread类的常用方法包括构造方法(如Thread()、Thread(Runnable target)等)、静态方法(如currentThread()、sleep(long millis)、yield()等)和实例方法(如getId()、getName()、interrupt()、join()等),用于线程的创建、控制和管理。
|
2月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
70 1