【高并发】通过源码深度分析线程池中Worker线程的执行流程

简介: 在《高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文中我们深度分析了线程池执行任务的核心流程,在ThreadPoolExecutor类的addWorker(Runnable, boolean)方法中,使用CAS安全的更新线程的数量之后,接下来就是创建新的Worker线程执行任务,所以,我们先来分析下Worker类的源码。

大家好,我是冰河~~

在《高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文中我们深度分析了线程池执行任务的核心流程,在ThreadPoolExecutor类的addWorker(Runnable, boolean)方法中,使用CAS安全的更新线程的数量之后,接下来就是创建新的Worker线程执行任务,所以,我们先来分析下Worker类的源码。

Worker类分析

Worker类从类的结构上来看,继承了AQS(AbstractQueuedSynchronizer类)并实现了Runnable接口。本质上,Worker类既是一个同步组件,也是一个执行任务的线程。接下来,我们看下Worker类的源码,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;
    //执行任务的线程类
    final Thread thread;
    //初始化执行的任务,第一次执行的任务
    Runnable firstTask;
    //完成任务的计数
    volatile long completedTasks;
    //Worker类的构造方法,初始化任务并调用线程工厂创建执行任务的线程
    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    //重写Runnable接口的run()方法
    public void run() {
        //调用ThreadPoolExecutor类的runWorker(Worker)方法
        runWorker(this);
    }

    //检测是否是否获取到锁
    //state=0表示未获取到锁
    //state=1表示已获取到锁
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    
    //使用AQS设置线程状态
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    //尝试释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

在Worker类的构造方法中,可以看出,首先将同步状态state设置为-1,设置为-1是为了防止runWorker方法运行之前被中断。这是因为如果其他线程调用线程池的shutdownNow()方法时,如果Worker类中的state状态的值大于0,则会中断线程,如果state状态的值为-1,则不会中断线程。

Worker类实现了Runnable接口,需要重写run方法,而Worker的run方法本质上调用的是ThreadPoolExecutor类的runWorker方法,在runWorker方法中,会首先调用unlock方法,该方法会将state置为0,所以这个时候调用shutDownNow方法就会中断当前线程,而这个时候已经进入了runWork方法,就不会在还没有执行runWorker方法的时候就中断线程。

注意:大家需要重点理解Worker类的实现。

Worker类中调用了ThreadPoolExecutor类的runWorker(Worker)方法。接下来,我们一起看下ThreadPoolExecutor类的runWorker(Worker)方法的实现。

runWorker(Worker)方法

首先,我们看下RunWorker(Worker)方法的源码,如下所示。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //释放锁,将state设置为0,允许中断任务的执行
    w.unlock();
    boolean completedAbruptly = true;
    try {
        //如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环
        while (task != null || (task = getTask()) != null) {
            //如果任务不为空,则获取Worker工作线程的独占锁
            w.lock();
            //如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程
            //大家好好理解下这个逻辑
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //中断线程
                wt.interrupt();
            try {
                //执行任务前执行的逻辑
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //调用Runable接口的run方法执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务后执行的逻辑
                    afterExecute(task, thrown);
                }
            } finally {
                //任务执行完成后,将其设置为空
                task = null;
                //完成的任务数量加1
                w.completedTasks++;
                //释放工作线程获得的锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //执行退出Worker线程的逻辑
        processWorkerExit(w, completedAbruptly);
    }
}

这里,我们拆解runWorker(Worker)方法。

(1)获取当前线程的句柄和工作线程中的任务,并将工作线程中的任务设置为空,执行unlock方法释放锁,将state状态设置为0,此时可以中断工作线程,代码如下所示。

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//释放锁,将state设置为0,允许中断任务的执行
w.unlock();

(2)在while循环中进行判断,如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环,否则,调用processWorkerExit(Worker, boolean)方法退出Worker工作线程。

while (task != null || (task = getTask()) != null)

(3)如果满足while的循环条件,首先获取工作线程内部的独占锁,并执行一系列的逻辑判断来检测是否需要中断当前线程的执行,代码如下所示。

//如果任务不为空,则获取Worker工作线程的独占锁
w.lock();
//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程
//大家好好理解下这个逻辑
if ((runStateAtLeast(ctl.get(), STOP) ||
     (Thread.interrupted() &&
      runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
    //中断线程
    wt.interrupt();

(4)调用执行任务前执行的逻辑,如下所示

//执行任务前执行的逻辑
beforeExecute(wt, task);

(5)调用Runable接口的run方法执行任务

//调用Runable接口的run方法执行任务
task.run();

(6)调用执行任务后执行的逻辑

//执行任务后执行的逻辑
afterExecute(task, thrown);

(7)将完成的任务设置为空,完成的任务数量加1并释放工作线程的锁。

//任务执行完成后,将其设置为空
task = null;
//完成的任务数量加1
w.completedTasks++;
//释放工作线程获得的锁
w.unlock();

(8)退出Worker线程的执行,如下所示

//执行退出Worker线程的逻辑
processWorkerExit(w, completedAbruptly);

从代码分析上可以看到,当从Worker线程中获取的任务为空时,会调用getTask()方法从任务队列中获取任务,接下来,我们看下getTask()方法的实现。

getTask()方法

我们先来看下getTask()方法的源代码,如下所示。

private Runnable getTask() {
    //轮询是否超时的标识
    boolean timedOut = false;
    //自旋for循环
    for (;;) {
        //获取ctl
        int c = ctl.get();
        //获取线程池的状态
        int rs = runStateOf(c);

        //检测任务队列是否在线程池停止或关闭的时候为空
        //也就是说任务队列是否在线程池未正常运行时为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //减少Worker线程的数量
            decrementWorkerCount();
            return null;
        }
        //获取线程池中线程的数量
        int wc = workerCountOf(c);

        //检测当前线程池中的线程数量是否大于corePoolSize的值或者是否正在等待执行任务
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //如果线程池中的线程数量大于corePoolSize
        //获取大于corePoolSize或者是否正在等待执行任务并且轮询超时
        //并且当前线程池中的线程数量大于1或者任务队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //成功减少线程池中的工作线程数量
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //从任务队列中获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            //任务不为空直接返回任务
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask()方法的逻辑比较简单,大家看源码就可以了,我这里就不重复描述了。

接下来,我们看下在正式调用Runnable的run()方法前后,执行的beforeExecute方法和afterExecute方法。

beforeExecute(Thread, Runnable)方法

beforeExecute(Thread, Runnable)方法的源代码如下所示。

protected void beforeExecute(Thread t, Runnable r) { }

可以看到,beforeExecute(Thread, Runnable)方法的方法体为空,我们可以创建ThreadPoolExecutor的子类来重写beforeExecute(Thread, Runnable)方法,使得线程池正式执行任务之前,执行我们自己定义的业务逻辑。

afterExecute(Runnable, Throwable)方法

afterExecute(Runnable, Throwable)方法的源代码如下所示。

protected void afterExecute(Runnable r, Throwable t) { }

可以看到,afterExecute(Runnable, Throwable)方法的方法体同样为空,我们可以创建ThreadPoolExecutor的子类来重写afterExecute(Runnable, Throwable)方法,使得线程池在执行任务之后执行我们自己定义的业务逻辑。

接下来,就是退出工作线程的processWorkerExit(Worker, boolean)方法。

processWorkerExit(Worker, boolean)方法

processWorkerExit(Worker, boolean)方法的逻辑主要是执行退出Worker线程,并且对一些资源进行清理,源代码如下所示。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //执行过程中出现了异常,突然中断
    if (completedAbruptly)
        //将工作线程的数量减1
        decrementWorkerCount();
    //获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //累加完成的任务数量
        completedTaskCount += w.completedTasks;
        //将完成的任务从workers集合中移除
        workers.remove(w);
    } finally {
        //释放锁
        mainLock.unlock();
    }
    //尝试终止工作线程的执行
    tryTerminate();
    //获取ctl
    int c = ctl.get();
    //判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)
    if (runStateLessThan(c, STOP)) {
        //如果没有突然中断完成
        if (!completedAbruptly) {
            //如果allowCoreThreadTimeOut为true,为min赋值为0,否则赋值为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果min为0并且工作队列不为空
            if (min == 0 && ! workQueue.isEmpty())
                //min的值设置为1
                min = 1;
            //如果线程池中的线程数量大于min的值
            if (workerCountOf(c) >= min)
                //返回,不再执行程序
                return; 
        }
        //调用addWorker方法
        addWorker(null, false);
    }
}

接下来,我们拆解processWorkerExit(Worker, boolean)方法。

(1)执行过程中出现了异常,突然中断执行,则将工作线程数量减1,如下所示。

//执行过程中出现了异常,突然中断
if (completedAbruptly)
    //将工作线程的数量减1
    decrementWorkerCount();

(2)获取锁累加完成的任务数量,并将完成的任务从workers集合中移除,并释放,如下所示。

//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    //累加完成的任务数量
    completedTaskCount += w.completedTasks;
    //将完成的任务从workers集合中移除
    workers.remove(w);
} finally {
    //释放锁
    mainLock.unlock();
}

(3)尝试终止工作线程的执行

//尝试终止工作线程的执行
tryTerminate();

(4)处判断当前线程池中的线程个数是否小于核心线程数,如果是,需要新增一个线程保证有足够的线程可以执行任务队列中的任务或者提交的任务。

//获取ctl
int c = ctl.get();
//判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)
if (runStateLessThan(c, STOP)) {
    //如果没有突然中断完成
    if (!completedAbruptly) {
        //如果allowCoreThreadTimeOut为true,为min赋值为0,否则赋值为corePoolSize
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        //如果min为0并且工作队列不为空
        if (min == 0 && ! workQueue.isEmpty())
            //min的值设置为1
            min = 1;
        //如果线程池中的线程数量大于min的值
        if (workerCountOf(c) >= min)
            //返回,不再执行程序
            return; 
    }
    //调用addWorker方法
    addWorker(null, false);
}

接下来,我们看下tryTerminate()方法。

tryTerminate()方法

tryTerminate()方法的源代码如下所示。

final void tryTerminate() {
    //自旋for循环
    for (;;) {
        //获取ctl
        int c = ctl.get();
        //如果线程池的状态为RUNNING
        //或者状态大于TIDYING
        //或者状态为SHUTDOWN并且任务队列为空
        //直接返回程序,不再执行后续逻辑
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //如果当前线程池中的线程数量不等于0
        if (workerCountOf(c) != 0) { 
            //中断线程的执行
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //获取线程池的全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //通过CAS将线程池的状态设置为TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //调用terminated()方法
                    terminated();
                } finally {
                    //将线程池状态设置为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            //释放锁
            mainLock.unlock();
        }
    }
}

(1)获取ctl,根据情况设置线程池状态或者中断线程的执行,并返回。

//获取ctl
int c = ctl.get();
//如果线程池的状态为RUNNING
//或者状态大于TIDYING
//或者状态为SHUTDOWN并且任务队列为空
//直接返回程序,不再执行后续逻辑
if (isRunning(c) ||
    runStateAtLeast(c, TIDYING) ||
    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
    return;
//如果当前线程池中的线程数量不等于0
if (workerCountOf(c) != 0) { 
    //中断线程的执行
    interruptIdleWorkers(ONLY_ONE);
    return;
}

(2)获取全局锁,通过CAS设置线程池的状态,调用terminated()方法执行逻辑,最终将线程池的状态设置为TERMINATED,唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程,最终释放锁,如下所示。

//获取线程池的全局
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    //通过CAS将线程池的状态设置为TIDYING
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
            //调用terminated()方法
            terminated();
        } finally {
            //将线程池状态设置为TERMINATED
            ctl.set(ctlOf(TERMINATED, 0));
            //唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程
            termination.signalAll();
        }
        return;
    }
} finally {
    //释放锁
    mainLock.unlock();
}

接下来,看下terminated()方法。

terminated()方法

terminated()方法的源代码如下所示。

protected void terminated() { }

可以看到,terminated()方法的方法体为空,我们可以创建ThreadPoolExecutor的子类来重写terminated()方法,值得Worker线程调用tryTerminate()方法时执行我们自己定义的terminated()方法的业务逻辑。

好了,今天就到这儿吧,我是冰河,我们下期见~~

目录
相关文章
|
27天前
|
存储 算法 Java
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
71 0
|
13天前
|
JavaScript Java 测试技术
基于Java的高并发慕课网的设计与实现(源码+lw+部署文档+讲解等)
基于Java的高并发慕课网的设计与实现(源码+lw+部署文档+讲解等)
23 2
|
23天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解08-阻塞队列之LinkedBlockingDeque
**摘要:** 本文分析了Java中的LinkedBlockingDeque,它是一个基于链表实现的双端阻塞队列,具有并发安全性。LinkedBlockingDeque可以作为有界队列使用,容量由构造函数指定,默认为Integer.MAX_VALUE。队列操作包括在头部和尾部的插入与删除,这些操作由锁和Condition来保证线程安全。例如,`linkFirst()`和`linkLast()`用于在队首和队尾插入元素,而`unlinkFirst()`和`unlinkLast()`则用于删除首尾元素。队列的插入和删除方法根据队列是否满或空,可能会阻塞或唤醒等待的线程,这些操作通过`notFul
44 5
|
23天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue
`LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。 `xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹
35 5
|
23天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。 2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。 3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较
42 2
|
23天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于: 1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。 2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。 总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;
32 1
|
24天前
|
Java 测试技术 Python
Python开启线程和线程池的方法
Python开启线程和线程池的方法
17 0
Python开启线程和线程池的方法
|
27天前
|
安全 Java 调度
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
43 2
|
28天前
|
存储 算法 Linux
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
25 0
|
1月前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
52 2

热门文章

最新文章