异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解2

简介: 异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解2

代码3执行清理任务,其代码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
     ...
    //(3.1)统计整个线程池完成的任务个数,并从工作集里面删除当前woker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //(3.2)尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空
    //或者当前是stop状态且当前线程池里面没有活动线程
    tryTerminate();
    //(3.3)如果当前线程个数小于核心个数,则增加
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}


代码3.1统计线程池完成任务个数,可知在统计前加了全局锁,把当前工作线程中完成的任务累加到全局计数器,然后从工作集中删除当前Worker。


代码3.2判断如果当前线程池状态是shutdown状态并且工作队列为空,或者当前是stop状态并且当前线程池里面没有活动线程,则设置线程池状态为TERMINATED。


代码3.3判断当前线程中的线程个数是否小于核心线程个数,如果是则新增一个线程。




关闭线程池原理解析


线程池中有两种模式的线程池关闭方法



public void shutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //(1)权限检查
        checkShutdownAccess();
        //(2)设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回
        advanceRunState(SHUTDOWN);
        //(3)设置中断标志
        interruptIdleWorkers();
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    //(4)尝试状态变为TERMINATED
    tryTerminate();
}


代码1检查如果设置了安全管理器,则看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。


代码2的内容如下,如果当前状态>=SHUTDOWN则直接返回,否则设置当前状态为SHUTDOWN:


private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}


代码3的内容如下,设置所有空闲线程的中断标志,这里首先加了全局锁,同时只有一个线程可以调用shutdown设置中断标志。然后尝试获取Worker本身的锁,获取成功则设置中断标识,由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到getTask()方法,企图从队列里获取任务的线程,也就是空闲线程。


private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //如果工作线程没有被中断,并且没有正在运行则设置中断
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}


代码4尝试将线程池的状态变为TERMINATED,tryTerminate的代码如下:

final void tryTerminate() {
        for (;;) {
            ...
            int c = ctl.get();
            ...
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {//设置当前线程池状态为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        //设置当前线程池状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //激活调用条件变量termination的await系列方法被阻塞的所有线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }


如上述代码所示,首先使用CAS设置当前线程池状态为TIDYING,如果成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED,最后调用termination.signalAll()来激活调用线程池的awaitTermination系列方法被阻塞的所有线程。



public void shutdownNow()

下面我们来看public void shutdownNow()方法的代码逻辑:

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//(5)权限检查
        advanceRunState(STOP);//(6) 设置线程池状态为stop
        interruptWorkers();//(7)中断所有线程
        tasks = drainQueue();//(8)移动队列任务到tasks
    } finally {
        mainLock.unlock();
    }
    //(9)终止状态
    tryTerminate();
    return tasks;
}


首先调用代码5检查权限,然后调用代码6设置当前线程池状态为STOP,接着执行代码7中断所有的工作线程,这里需要注意的是中断所有线程,包含空闲线程和正在执行任务的线程:

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}


然后调用代码8将当前任务队列的任务移动到tasks列表,代码如下:

  private List<Runnable> drainQueue() {
    //8.1获取任务队列
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    //8.2 从任务队列移除任务到taskList列表
    q.drainTo(taskList);
    //8.3 如果q还不为空,则说明drainTo接口调用失效,则循环移除
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    //8.4返回异常的任务列表
    return taskList;
}


由上述代码可知,调用线程池队列的drainTo方法把队列中的任务移除到taskList里,如果发现线程池队列还不为空(比如DelayQueue或者其他类型的队列drainTo可能移除元素失败),则循环移除里面的元素,最后返回移除的任务列表。



线程池的拒绝策略解析


线程池是通过池化少量线程来提供线程复用的,当调用线程向线程池中投递大量任务后,线程池可能就处于饱和状态了。所谓饱和状态是指当前线程池队列已经满了,并且线程池中的线程已经达到了最大线程个数。当线程池处于饱和状态时,再向线程池投递任务,而对于投递的任务如何处理,是由线程池拒绝策略决定的。拒绝策略的执行是在execute方法,大家可以返回前面章节查看。


线程池中提供了RejectedExecutionHandler接口,用来提供对线程池拒绝策略的抽象,其定义如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}



线程池中提供了一系列该接口的实现供我们使用




AbortPolicy


首先我们看下AbortPolicy策略的代码:

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    /**
     * 抛出RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}


由上述代码可知,该拒绝策略执行时会直接向调用线程抛出RejectedExecutionException异常,并丢失提交的任务。



CallerRunsPolicy


然后我们看下CallerRunsPolicy策略的代码:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    /**
     * 使用调用线程执行任务r
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

分析上述代码,该拒绝策略执行时,如果线程池没有被关闭,则会直接使用调用线程执行提交的任务r,否则默默丢弃该任务。



DiscardPolicy


然后我们看下DiscardPolicy策略的代码:

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    /**
     * 什么都不做,默默丢弃提交的任务
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

该拒绝策略执行时,什么都不做,默默丢弃提交的任务。



DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        /**
         * 丢弃线程池队列里面最老的任务,并把当前任务提交到线程池
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();//移除队首元素
                e.execute(r);//提交任务r到线程池执行
            }
        }
    }


该拒绝策略首先会丢弃线程池队列里面最老的任务,然后把当前任务r提交到线程池。

相关文章
|
24天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
51 12
|
1月前
|
开发框架 Java .NET
.net core 非阻塞的异步编程 及 线程调度过程
【11月更文挑战第12天】本文介绍了.NET Core中的非阻塞异步编程,包括其基本概念、实现方式及应用示例。通过`async`和`await`关键字,程序可在等待I/O操作时保持线程不被阻塞,提高性能。文章还详细说明了异步方法的基础示例、线程调度过程、延续任务机制、同步上下文的作用以及如何使用`Task.WhenAll`和`Task.WhenAny`处理多个异步任务的并发执行。
|
3月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
152 29
|
2月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
41 1
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
37 1
|
3月前
|
Java Android开发 UED
🧠Android多线程与异步编程实战!告别卡顿,让应用响应如丝般顺滑!🧵
在Android开发中,为应对复杂应用场景和繁重计算任务,多线程与异步编程成为保证UI流畅性的关键。本文将介绍Android中的多线程基础,包括Thread、Handler、Looper、AsyncTask及ExecutorService等,并通过示例代码展示其实用性。AsyncTask适用于简单后台操作,而ExecutorService则能更好地管理复杂并发任务。合理运用这些技术,可显著提升应用性能和用户体验,避免内存泄漏和线程安全问题,确保UI更新顺畅。
124 5
|
2月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
62 0
|
2月前
|
Java 应用服务中间件 API
nginx线程池原理
nginx线程池原理
42 0
|
3月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
1天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
12 1