Java线程池ThreadPoolExcutor源码解读详解09-4种拒绝策略

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
可观测链路 OpenTelemetry 版,每月50GB免费额度
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 本文介绍了线程池的四种拒绝策略:AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy,并通过代码示例展示了它们在任务过多时的不同处理方式。AbortPolicy会抛出异常并停止主线程;DiscardPolicy会默默丢弃新任务;DiscardOldestPolicy会抛弃队列中最旧的任务来接纳新任务;而CallerRunsPolicy则是由调用者线程执行被拒绝的任务,以减缓新任务的提交速度。这四种策略适用于不同的场景,开发者可以根据需求选择合适的策略。

💪🏻 制定明确可量化的目标,坚持默默的做事。





一、继承实现关系图

线程池的四种拒绝策略实现关系图:

image.gif image.png

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常并且阻止主线程正常运行
  • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
  • CallerRunsPolicy:“调用者运行“一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,让调用者去处理,比如main调用了线程池,线程池处理不了就让main慢慢处理去,从而降低新任务的流量

二、拒绝策略测试示例

// 测试任务类
class Task implements Runnable {
        private String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(1100);
                System.out.println("thread name: " + this.name);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

image.gif

2.1 AbortPolicy策略

测试代码块:

// ThreadPoolExcecutor类中
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// 测试类
private void abortPolicy() {
    ThreadFactory factory = r -> new Thread(r, "AbortPolicy ThreadFactory");
    // 创建核心线程和最大线程为2的线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
            2,
            1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), factory);
    int i = 0;
    int n = 100;
    // 添加100个任务
    while (i++ < n) {
        threadPoolExecutor.execute(new ThreadPolicyTester.Task("" + i));
    }
}

image.gif

运行如上代码打印如下:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task w.cx.lrn.java.interview.threadpool.ThreadPolicyTester$Task@4d405ef7 rejected from java.util.concurrent.ThreadPoolExecutor@6193b845[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
  at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
  at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
  at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
  at w.cx.lrn.java.interview.threadpool.ThreadPolicyTester.abortPolicy(ThreadPolicyTester.java:34)
  at w.cx.lrn.java.interview.threadpool.ThreadPolicyTester.main(ThreadPolicyTester.java:18)
thread name: 1
thread name: 2
thread name: 3
thread name: 4

image.gif

说明:

  • 没有指定拒绝策略,默认为AbortPolicy拒绝策略
  • 最大线程数为2,则有两个任务在运行
  • 阻塞队列数为2,则阻塞队列中可添加2个任务
  • 添加第五个任务的时候抛出了 RejectedExecutionException异常并且阻止主线程正常运行(没有再尝试添加第六个线程),所以只打印前4个任务1,2,3和4任务

2.2 DiscardPolicy策略

测试代码块:(添了拒绝策略)

private void discardPolicy() {
    ThreadFactory factory2 = r -> new Thread(r, "DiscardPolicy ThreadFactory");
    // 核心线程数和、最大线程数 和 阻塞队列大小均为2
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
            2,
            1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), factory2, new ThreadPoolExecutor.DiscardPolicy());
    int i = 0;
    int n = 10;
    // 添加10个任务
    while (i++ < n) {
        threadPoolExecutor.execute(new ThreadPolicyTester.Task("" + i));
        System.out.println("添加了第 " + i + " 个任务!");
    }
}

image.gif

运行如上代码打印如下:

添加了第 1 个任务!
添加了第 2 个任务!
添加了第 3 个任务!
添加了第 4 个任务!
添加了第 5 个任务!
添加了第 6 个任务!
添加了第 7 个任务!
添加了第 8 个任务!
添加了第 9 个任务!
添加了第 10 个任务!
thread name: 1
thread name: 2
thread name: 4
thread name: 3

image.gif

说明:

  • 指定拒绝策略为DiscardPolicy
  • 最大线程数为2,则有两个任务在运行
  • 阻塞队列数为2,则阻塞队列中可添加2个任务
  • 添加第5个到第10个任务都不予任何处理也不抛出异常,所以只打印前4个任务1,2,3和4任务

2.3 DiscardOldestPolicy策略

测试代码块:(添了DiscardOldestPolicy拒绝策略)

private void discardOldestPolicy() {
    ThreadFactory factory2 = r -> new Thread(r, "DiscardOldestPolicy ThreadFactory");
    // 核心线程数、最大线程数 和 阻塞队列大小均为2
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
            2,
            0,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), factory2, new ThreadPoolExecutor.DiscardOldestPolicy());
    int i = 0;
    int n = 10;
    // 添加10个任务
    while (i++ < n) {
        threadPoolExecutor.execute(new ThreadPolicyTester.Task("" + i));
        System.out.println("添加了第 " + i + " 个任务!");
    }
}

image.gif

运行如上代码打印如下:

添加了第 1 个任务!
添加了第 2 个任务!
添加了第 3 个任务!
添加了第 4 个任务!
添加了第 5 个任务!
添加了第 6 个任务!
添加了第 7 个任务!
添加了第 8 个任务!
添加了第 9 个任务!
添加了第 10 个任务!
thread name: 1
thread name: 2
thread name: 9
thread name: 10

image.gif

说明:

  • 指定拒绝策略为DiscardOldestPolicy
  • 最大线程数为2,则有两个任务在运行
  • 阻塞队列数为2,则阻塞队列中可添加2个任务
  • 拒绝策略为DiscardOldestPolicy,添加第5个时,则抛弃第3个任务添加第5个任务,添加第6个任务时,抛弃第4个任务添加第6个任务,依次类推最后阻塞队列中存放的任务为第9个和第10个,所以最后只打印前2个任务1,2和9、10任务

2.4 CallerRunsPolicy策略

测试代码块:(添了CallerRunsPolicy拒绝策略)

private void callerRunsPolicy() {
    ThreadFactory factory2 = r -> new Thread(r, "TestThreadPool");
    // 核心线程数、最大线程数 和 阻塞队列大小均为2
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
            2,
            0,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), factory2, new ThreadPoolExecutor.CallerRunsPolicy());
    int i = 0;
    int n = 10;
    // 添加10个任务
    while (i++ < n) {
        threadPoolExecutor.execute(new ThreadPolicyTester.Task("" + i));
        System.out.println("添加了第 " + i + " 个任务!");
    }
}

image.gif

为了方便辨别是主线程执行还是子线程执行,测试任务类的打印修改成如下:

 

System.out.println("thread name: " + Thread.currentThread().getName() + " " + this.name);

image.gif

运行如上代码打印如下:

添加了第 1 个任务!
添加了第 2 个任务!
添加了第 3 个任务!
添加了第 4 个任务!
thread name: callerRunsPolicy  2
thread name: callerRunsPolicy  1
thread name: main 5
添加了第 5 个任务!
添加了第 6 个任务!
添加了第 7 个任务!
thread name: callerRunsPolicy  3
thread name: main 8
添加了第 8 个任务!
添加了第 9 个任务!
thread name: callerRunsPolicy  4
thread name: callerRunsPolicy  6
thread name: main 10
thread name: callerRunsPolicy  7
添加了第 10 个任务!
thread name: callerRunsPolicy  9

image.gif

说明:(多线程环境下运行,每次打印都不一样)

  • 指定拒绝策略为CallerRunsPolicy
  • 最大线程数为2,则有两个任务在运行
  • 阻塞队列数为2,则阻塞队列中可添加2个任务
  • 10任务都打印出来了,被拒绝的第5、8 和 10任务是由主线程执行的,拒绝策略源码如下。就是说添加新任务被拒绝后,只要主线程创建的线程池没有关闭,由主线程来执行被拒绝的新任务。只有线程池关闭了,被拒绝的任务才被丢弃且不抛异常。

拒绝策略源码:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

image.gif

三、总结及应用场景

  • AbortPolicy拒绝策略:新任务不能再添加的时候,会直接抛出异常及时反馈程序运行状态,在一些比较重要的业务中,使用此策略在不能再处理更多的并发量的时候,就能及时的通过异常发现问题。
  • DiscardPolicy拒绝策略:在不能再添加新任务时此策略直接丢弃任务。比如在一些无关紧要的任务可用此策略。
  • DiscardOldestPolicy拒绝策略:新任务替换老任务。应用于任务分配有一定的容忍性的情况,弃用最老的任务以容纳新的任务,一般是可以接受的情况下。
  • CallerRunsPolicy拒绝策略:只要主线程创建的线程池没有关闭,则由主线程来执行任务。这种由主线程来一个一个执行被拒绝新任务,从而实现降低新任务的提交速度。
相关文章
|
8天前
|
存储 数据可视化 Java
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
11天前
|
Java
Java线程池如何执行?拒绝策略有哪些?
【8月更文挑战第8天】Java线程池如何执行?拒绝策略有哪些?
30 6
|
7天前
|
缓存 监控 Java
Java性能优化:从单线程执行到线程池管理的进阶实践
在Java开发中,随着应用规模的不断扩大和用户量的持续增长,性能优化成为了一个不可忽视的重要课题。特别是在处理大量并发请求或执行耗时任务时,单线程执行模式往往难以满足需求,这时线程池的概念便应运而生。本文将从应用场景举例出发,探讨Java线程池的使用,并通过具体案例和核心代码展示其在实际问题解决中的强大作用。
22 1
|
10天前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
29 4
|
8天前
|
Java
Java线程池核心数为0时,线程池如何执行?
【8月更文挑战第11天】Java线程池核心数为0时,线程池如何执行?
21 1
|
10天前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
29 3
|
4天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
6天前
|
安全 Java 网络安全
云计算时代下的网络安全挑战与应对策略Java编程中的异常处理:从基础到高级
在云服务不断深入各行各业的今天,网络安全问题也随之凸显。本文将探讨云计算环境下的安全风险,并提出相应的防护措施,以期为相关行业提供参考和指导。 在Java的世界里,异常处理是代码健壮性的守护神。它不仅保护程序免于意外崩溃,还提供了一种优雅的方式来响应错误。本文将带你领略异常处理的艺术,从简单的try-catch语句到复杂的自定义异常和finally块的神秘力量,我们将一起探索如何让Java程序在面对不确定性时,依然能够优雅地起舞。