如何解决JDK线程池中不超过最大线程数下快速消费任务

简介: 如何解决JDK线程池中不超过最大线程数下快速消费任务

在这里插入图片描述

前言

文章需要对线程池执行任务流程有一定的了解

记得之前我写通过模版设计来解决 线程池参数自定义痛点, 然后宽哥在下面灵魂发问, 也就是咱们这篇文章讲到的重点

日常灵魂发问

来来来, 我给大家复制粘贴出来

如何解决 JDK 线程池中不超过最大线程数下即时快速消费任务, 而不是在队列中堆积

因为最近业务落地改造中需要线程池, 又去看了一遍源码, 防止线上埋雷, 也再次回顾了这个问题

然后发现网上也有这种问题提问, 虽然是不同的提问, 但是核心思想是一致的, 点击跳转

业务是多变的, 而 JDK 中的线程池消费流程却是固定的, 所以 基于阻塞队列、线程池扩展改变了原有流程

01、线程池参数

我们这里讲解以 ThreadPoolExecutor#execute(Runnable runnable) 举例, 这里先说下线程池的一些参数

本篇只是说明上述问题, 不会对线程池做详细讲解
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {...}

corePoolSize

线程池中的核心线程数量, 如果没有全局设置池内线程的过期时间, 池内会维持此数量线程

maximumPoolSize

线程池中的最大线程数量, 当核心线程都在运行任务, 并且阻塞队列中任务数量已满, 此时会创建非核心线程

keepAliveTime & unit

线程池中线程过期时间以及时间单位

workQueue

存放线程池内任务的阻塞队列, 如 ArrayBlockingQueue、LinkedBlockingQueue...

threadFactory

创建线程池中线程的线程工厂, 可以在创建线程时初始化优先级、名称、守护状态...

handler

当线程池中全部线程都在运行, 阻塞队列也满的时候, 会将添加的任务执行拒绝策略, JDK 线程池中实现了四种拒绝策略, 默认 AbortPolicy, 抛出异常

02、线程池任务添加流程

相信大家在网上看到过许多类似的线程池执行流程图哈, 这里还是简要赘述下, 源码如下:

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))
        reject(command);
}

1、线程池提交任务首先判断当前线程数是否大于核心线程数, 否则创建核心线程执行任务

2、如果当前线程超过了核心线程数, 判断阻塞队列是否已满, 否则将任务添加到队列中

3、如果阻塞队列已满, 判断当前线程是否大于最大线程数, 否则创建非核心线程执行任务

4、如果当前线程大于或等于最大线程数, 执行拒绝策略

线程池任务提交流程

这道问题的意图就是要将第二步就行改写

如果当前线程大于核心线程数, 不将任务放入阻塞队列, 而是创建非核心线程执行任务

举例说明一下:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(1, 3, 60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue(10));

    for (int i = 0; i < 7; i++) {
        threadPoolExecutor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "-执行任务");
            LockSupport.park();
        });
    }
    threadPoolExecutor.shutdown();
    /**
     * pool-1-thread-1执行任务
     */
}

看到这段代码, 正常情况下只会有一个任务会被执行, 其余任务会被放置阻塞队列中

而我们需要做的就是, 发现池内线程大于核心线程数, 不放入阻塞队列, 而是创建非核心线程进行消费任务

本地代码实现参考 Dubbo 源码中 EagerThreadPoolExecutor, 确实能实现对应效果, 这里就不演示了, 一起看一下 Dubbo 如何做的

03、Dubbo 中实现的快速消费

Dubbo 中涉及到的类有两个, EagerThreadPoolExecutorTaskQueue

这里贴一下重点代码

3.1 TaskQueue

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
        ...
    // 队列中持有线程池的引用
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
                ...
        // 获取线程池中线程数
        int currentPoolThreadSize = executor.getPoolSize();
        // 如果有核心线程正在空闲, 将任务加入阻塞队列, 由核心线程进行处理任务
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

          /**
           *【重点】当前线程池线程数量小于最大线程数
           * 返回false, 根据线程池源码, 会创建非核心线程
           */
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 如果当前线程池数量大于最大线程数, 任务加入阻塞队列
        return super.offer(runnable);
    }
}  

存在一个疑点, getSubmittedTaskCount() 是如何获取提交任务数量的?

这里就需要看一下 EagerThreadPoolExecutor 实现了, 也比较简单, 只是 重写了线程池的两个方法: afterExecute()、execute()

3.2 EagerThreadPoolExecutor

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }
  
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

EagerThreadPoolExecutor 继承了 ThreadPoolExecutor, 在 execute() 上做了个性化设计

并在线程池内新增了一个任务数量的字段, 是一个原子类, 添加任务时自增, 任务异常及结束时递减

这样就能保证 TaskQueue#offer(Runnable runnable) 做出逻辑处理

相关文章
|
28天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
104 38
|
26天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
52 2
|
28天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
65 4
|
28天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
110 2
|
1月前
|
Dubbo Java 应用服务中间件
剖析Tomcat线程池与JDK线程池的区别和联系!
剖析Tomcat线程池与JDK线程池的区别和联系!
114 0
剖析Tomcat线程池与JDK线程池的区别和联系!
|
30天前
|
监控 数据可视化 Java
如何使用JDK自带的监控工具JConsole来监控线程池的内存使用情况?
如何使用JDK自带的监控工具JConsole来监控线程池的内存使用情况?
|
2月前
|
Java
安装JDK18没有JRE环境的解决办法
安装JDK18没有JRE环境的解决办法
346 3
|
3月前
|
Java 关系型数据库 MySQL
"解锁Java Web传奇之旅:从JDK1.8到Tomcat,再到MariaDB,一场跨越数据库的冒险安装盛宴,挑战你的技术极限!"
【8月更文挑战第19天】在Linux上搭建Java Web应用环境,需安装JDK 1.8、Tomcat及MariaDB。本指南详述了使用apt-get安装OpenJDK 1.8的方法,并验证其版本。接着下载与解压Tomcat至`/usr/local/`目录,并启动服务。最后,通过apt-get安装MariaDB,设置基本安全配置。完成这些步骤后,即可验证各组件的状态,为部署Java Web应用打下基础。
59 1
|
3月前
|
Oracle Java 关系型数据库
Mac安装JDK1.8
Mac安装JDK1.8
717 4
|
1月前
|
Oracle Java 关系型数据库
jdk17安装全方位手把手安装教程 / 已有jdk8了,安装JDK17后如何配置环境变量 / 多个不同版本的JDK,如何配置环境变量?
本文提供了详细的JDK 17安装教程,包括下载、安装、配置环境变量的步骤,并解释了在已有其他版本JDK的情况下如何管理多个JDK环境。
868 0