如何解决JDK线程池中不超过最大线程数下快速消费任务

简介: 如何解决JDK线程池中不超过最大线程数下快速消费任务

在这里插入图片描述

前言

文章需要对线程池执行任务流程有一定的了解

记得之前我写通过模版设计来解决 线程池参数自定义痛点, 然后宽哥在下面灵魂发问, 也就是咱们这篇文章讲到的重点

日常灵魂发问

来来来, 我给大家复制粘贴出来

如何解决 JDK 线程池中不超过最大线程数下即时快速消费任务, 而不是在队列中堆积

因为最近业务落地改造中需要线程池, 又去看了一遍源码, 防止线上埋雷, 也再次回顾了这个问题

然后发现网上也有这种问题提问, 虽然是不同的提问, 但是核心思想是一致的, 点击跳转

业务是多变的, 而 JDK 中的线程池消费流程却是固定的, 所以 基于阻塞队列、线程池扩展改变了原有流程

01、线程池参数

我们这里讲解以 ThreadPoolExecutor#execute(Runnable runnable) 举例, 这里先说下线程池的一些参数

本篇只是说明上述问题, 不会对线程池做详细讲解
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {...}

corePoolSize

线程池中的核心线程数量, 如果没有全局设置池内线程的过期时间, 池内会维持此数量线程

maximumPoolSize

线程池中的最大线程数量, 当核心线程都在运行任务, 并且阻塞队列中任务数量已满, 此时会创建非核心线程

keepAliveTime & unit

线程池中线程过期时间以及时间单位

workQueue

存放线程池内任务的阻塞队列, 如 ArrayBlockingQueue、LinkedBlockingQueue...

threadFactory

创建线程池中线程的线程工厂, 可以在创建线程时初始化优先级、名称、守护状态...

handler

当线程池中全部线程都在运行, 阻塞队列也满的时候, 会将添加的任务执行拒绝策略, JDK 线程池中实现了四种拒绝策略, 默认 AbortPolicy, 抛出异常

02、线程池任务添加流程

相信大家在网上看到过许多类似的线程池执行流程图哈, 这里还是简要赘述下, 源码如下:

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))
        reject(command);
}

1、线程池提交任务首先判断当前线程数是否大于核心线程数, 否则创建核心线程执行任务

2、如果当前线程超过了核心线程数, 判断阻塞队列是否已满, 否则将任务添加到队列中

3、如果阻塞队列已满, 判断当前线程是否大于最大线程数, 否则创建非核心线程执行任务

4、如果当前线程大于或等于最大线程数, 执行拒绝策略

线程池任务提交流程

这道问题的意图就是要将第二步就行改写

如果当前线程大于核心线程数, 不将任务放入阻塞队列, 而是创建非核心线程执行任务

举例说明一下:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(1, 3, 60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue(10));

    for (int i = 0; i < 7; i++) {
        threadPoolExecutor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "-执行任务");
            LockSupport.park();
        });
    }
    threadPoolExecutor.shutdown();
    /**
     * pool-1-thread-1执行任务
     */
}

看到这段代码, 正常情况下只会有一个任务会被执行, 其余任务会被放置阻塞队列中

而我们需要做的就是, 发现池内线程大于核心线程数, 不放入阻塞队列, 而是创建非核心线程进行消费任务

本地代码实现参考 Dubbo 源码中 EagerThreadPoolExecutor, 确实能实现对应效果, 这里就不演示了, 一起看一下 Dubbo 如何做的

03、Dubbo 中实现的快速消费

Dubbo 中涉及到的类有两个, EagerThreadPoolExecutorTaskQueue

这里贴一下重点代码

3.1 TaskQueue

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
        ...
    // 队列中持有线程池的引用
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
                ...
        // 获取线程池中线程数
        int currentPoolThreadSize = executor.getPoolSize();
        // 如果有核心线程正在空闲, 将任务加入阻塞队列, 由核心线程进行处理任务
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

          /**
           *【重点】当前线程池线程数量小于最大线程数
           * 返回false, 根据线程池源码, 会创建非核心线程
           */
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 如果当前线程池数量大于最大线程数, 任务加入阻塞队列
        return super.offer(runnable);
    }
}  

存在一个疑点, getSubmittedTaskCount() 是如何获取提交任务数量的?

这里就需要看一下 EagerThreadPoolExecutor 实现了, 也比较简单, 只是 重写了线程池的两个方法: afterExecute()、execute()

3.2 EagerThreadPoolExecutor

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }
  
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

EagerThreadPoolExecutor 继承了 ThreadPoolExecutor, 在 execute() 上做了个性化设计

并在线程池内新增了一个任务数量的字段, 是一个原子类, 添加任务时自增, 任务异常及结束时递减

这样就能保证 TaskQueue#offer(Runnable runnable) 做出逻辑处理

相关文章
|
7天前
|
Java 程序员 数据库
Java线程池让使用线程变得更加高效
使用一个线程需要经过创建、运行、销毁三大步骤,如果业务系统每个线程都要经历这个过程,那会带来过多不必要的资源消耗。线程池就是为了解决这个问题而生,需要时就从池中拿取,使用完毕就放回去,池化思想通过复用对象大大提高了系统的性能。线程池、数据库连接池、对象池等都采用了池化技术,下面我们就来学习下线程池的核心知识、面试重点~
46 4
Java线程池让使用线程变得更加高效
|
5天前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
|
9天前
|
监控 Java 调度
Java多线程实战-从零手搓一个简易线程池(四)线程池生命周期状态流转实现
Java多线程实战-从零手搓一个简易线程池(四)线程池生命周期状态流转实现
|
9天前
|
设计模式 Java
Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现
Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现
|
9天前
|
Java 测试技术
Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义
Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义
|
9天前
|
存储 安全 Java
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
|
4天前
|
弹性计算 运维 Java
一键安装二进制JDK
【4月更文挑战第30天】
6 0
|
5天前
|
关系型数据库 MySQL 应用服务中间件
centos7在线安装jdk1.8+tomcat+mysql8+nginx+docker
现在,你已经成功在CentOS 7上安装了JDK 1.8、Tomcat、MySQL 8、Nginx和Docker。你可以根据需要配置和使用这些服务。请注意,安装和配置这些服务的详细设置取决于你的具体需求。
21 2
|
7天前
|
Java Windows
java——安装JDK及配置解决常见问题
java——安装JDK及配置解决常见问题
|
9天前
|
关系型数据库 MySQL Java
Linux 安装 JDK、MySQL、Tomcat(图文并茂)
Linux 安装 JDK、MySQL、Tomcat(图文并茂)
28 2