【Android 异步操作】线程池 ( 线程池 reject 拒绝任务 | 线程池 addWorker 添加任务 )

简介: 【Android 异步操作】线程池 ( 线程池 reject 拒绝任务 | 线程池 addWorker 添加任务 )

文章目录

一、线程池 reject 拒绝任务

二、线程池 addWorker 添加任务



在上一篇博客 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 ) 中 , 讲解 线程池 ThreadPoolExecutor 的 execute 方法时 , 有两个重要的核心方法 ;


两个核心的操作 :


添加任务 : addWorker(command, true) , 第二个参数为 true 是添加核心线程任务 , 第二个参数为 false 是添加非核心线程任务 ;

拒绝任务 : reject(command)

本博客中研究 拒绝任务 reject 方法的细节 ;






一、线程池 reject 拒绝任务


在 ThreadPoolExecutor 线程池中 , void reject(Runnable command) 方法 , 主要是调用了 RejectedExecutionHandler handler 的 rejectedExecution 方法 ;


该 handler 可以在如下 构造函数中传入 , 如下构造函数中的最后一个参数 ;


 

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {


部分相关代码示例 :


public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 当线程池任务饱和 , 或线程池关闭 , 使用该 Handler 处理拒绝任务异常情况 ;
     */
    private volatile RejectedExecutionHandler handler;
    /**
     * 在该构造函数中 , 可以传入 Handler ;
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    /**
     * 拒绝执行给定命令的处理 
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
}



自定义 RejectedExecutionHandler 处理拒绝任务的情况 ;


实现 rejectedExecution 方法 , 当 线程池任务队列饱和 , 或者 没有空闲线程 时 , 线程池被关闭 时 , 导致线程池的任务队列无法接受该任务时 , 会调用该方法



/**
 * 处理添加任务失败情况的 handler 
 * 用户可以自定义该接口 
 */
public interface RejectedExecutionHandler {
    /**
     * 
     * 无法接受任务时执行该方法 ;
     * 当线程池任务队列饱和 , 或者没有空闲线程时 , 线程池被关闭时 , 会调用该方法
     *
     * 该方法可能会触发 RejectedExecutionException 异常 , 用户需要捕获并处理该异常
     *
     * @param r 被拒绝的任务
     * @param executor 尝试执行该任务的执行者
     * @throws RejectedExecutionException 如果没有补救方法, 抛出该异常
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}






二、线程池 addWorker 添加任务


检查一个新的工作者 ( Worker ) 是否可以被添加 , 根据当前的 线程池状态 , 和 给定的核心线程数 , 最大线程数 等判定 ;


如果可以添加 , 那么先 调整 工作者 ( Worker ) 的个数 , 然后 创建新的 工作者 ( Worker ) , 并 将参数中的 Runnable firstTask 设置为第一个任务 ;


如果线程池停止或关闭 , 返回 false ;


如果线程创建失败 , 不管是线程工厂返回空 , 还是出现 OOM , 直接退出 ;



如果当前的 线程个数少于 核心线程数 , 或者当前的 任务数已满 , 则必须创建 工作者 ( Worker ) , 并执行第一个初始任务 ,


public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 检查一个新的工作者是否可以被添加 , 根据当前的线程池状态 , 和给定的核心线程数 , 最大线程数等判定 ;
     * 如果可以添加 , 那么先调整 工作者 ( Worker ) 的个数 , 然后创建新的 工作者 ( Worker ) , 
     * 并将参数中的 Runnable firstTask 设置为第一个任务 ; 
     * 如果线程池停止或关闭 , 返回 false ; 
     * 如果线程创建失败 , 不管是线程工厂返回空 , 还是出现 OOM , 直接退出 ; 
     *
     * 第一个运行的任务 ; 
     * 如果当前的线程个数少于 核心线程数 , 或者当前的任务数已满 , 
     * 必须创建 工作者 ( Worker ) , 并执行第一个初始任务 , 
     *
     * @param core 如果设置为 true , 必须使用核心线程绑定 
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 查看队列是否为空
            // 查看线程池是否 SHUTDOWN 
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    // 查看线程池的线程个数 , 与 核心线程数, 最大线程数 进行各种对比
    // 获取现在的线程池情况 
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
    // 如果没有达到最大线程数 , 允许添加 
                if (compareAndIncrementWorkerCount(c))  // 该行代码只是将计数器 +1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
  // 下面是添加一个线程的逻辑  
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          // 创建工作者 , 并将任务设置给该工作者 
            w = new Worker(firstTask);
            // 线程是从工作者中取出的 , 该线程是在 Worker 构造函数中使用线程工厂创建的 
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
      // 将工作者设置给 工作者集合 
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果添加成功 , 就会启动 Worker 中的线程 
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
}





目录
相关文章
|
5月前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
307 60
【Java并发】【线程池】带你从0-1入门线程池
|
3月前
|
Java
线程池是什么?线程池在实际工作中的应用
总的来说,线程池是一种有效的多线程处理方式,它可以提高系统的性能和稳定性。在实际工作中,我们需要根据任务的特性和系统的硬件能力来合理设置线程池的大小,以达到最佳的效果。
93 18
|
6月前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
5月前
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
5月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
181 0
|
6月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
238 17
|
3天前
|
安全 数据库 Android开发
在Android开发中实现两个Intent跳转及数据交换的方法
总结上述内容,在Android开发中,Intent不仅是活动跳转的桥梁,也是两个活动之间进行数据交换的媒介。运用Intent传递数据时需注意数据类型、传输大小限制以及安全性问题的处理,以确保应用的健壯性和安全性。
36 11
|
1月前
|
安全 Java Android开发
为什么大厂要求安卓开发者掌握Kotlin和Jetpack?深度解析现代Android开发生态优雅草卓伊凡
为什么大厂要求安卓开发者掌握Kotlin和Jetpack?深度解析现代Android开发生态优雅草卓伊凡
77 0
为什么大厂要求安卓开发者掌握Kotlin和Jetpack?深度解析现代Android开发生态优雅草卓伊凡
|
4月前
|
JavaScript Linux 网络安全
Termux安卓终端美化与开发实战:从下载到插件优化,小白也能玩转Linux
Termux是一款安卓平台上的开源终端模拟器,支持apt包管理、SSH连接及Python/Node.js/C++开发环境搭建,被誉为“手机上的Linux系统”。其特点包括零ROOT权限、跨平台开发和强大扩展性。本文详细介绍其安装准备、基础与高级环境配置、必备插件推荐、常见问题解决方法以及延伸学习资源,帮助用户充分利用Termux进行开发与学习。适用于Android 7+设备,原创内容转载请注明来源。
789 76