Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义

简介: Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义


前言

上节内容回顾:

Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列

在多线程程序中,线程的创建和销毁是一个频繁且代价高昂的操作。如果每次有新任务到来都创建一个新线程,将会导致系统资源的巨大浪费。为了更高效地利用线程资源,我们需要线程池来统一管理和复用线程。


线程池可以避免频繁创建和销毁线程的开销,提高系统的响应速度。同时,线程池还能够设置线程数量上限,防止无限制创建线程导致资源耗尽。因此,在高并发场景下,线程池是必不可少的重要工具。


在前面的文章中,我们讲解了任务堵塞队列的实现,这篇文章会基于任务堵塞队列实现一个简易的线程池,在后续的文章中,还会继续对本章编写的线程池进行功能扩展和优化。

1.线程池的设计

1.1.线程池的七大参数

corePoolSize——核心线程最大数


maximumPoolSize——线程池最大线程数


keepAliveTime——空闲线程存活时间。


unit——空闲线程存活时间单位


workQueue——等待队列(也就是我们上节完成的内容)


threadFactory——线程工厂


handler——拒绝策略

在上一章节我们已经实现了workQueue等待队列(原先我们的命名为BlockQueue,本节也跟着官方定义改为WorkQueue),本节内容会继续实现corePoolSize,keepAliveTime,unit,handler等基本参数的功能落地,threadFactory线程工厂和maximumPoolSize线程池最大线程数这些内容会在我们后续文章对线程池进行扩展补充时实现。

1.2.线程池的基本原理和工作流程

本章节线程池的基本原理如下:

新任务到达时,首先判断目前正在运行的线程数是否小于核心线程数。

如果小于,就创建工作线程去执行任务

如果大于等于,说明没有空闲的工作线程,我们将任务加入等待队列

每个工作线程执行完当前线程后都会继续尝试去等待队列中获取任务

如果工作线程超过我们规定的空闲线程存活时间,就会被回收

ps:JDK官方提供的线程池的线程回收只会回收非核心线程,本章节的实现的线程池是一个简易版,为了方便理解,没有分核心线程和非核心线程,全归类为工作线程,后续文章会继续扩展,加入线程工厂,核心线程和非核心线程等内容~

2.线程池对象的实现

2.1.核心属性和构造函数

创建线程池对象:

我们这里首先创建我们的线程池对象(可先定义线程池接口,如JDK官方的ExecutorService接口)

/**
 * @author Luckysj @刘仕杰
 * @description 自定义线程池对象
 * @create 2024/03/27 10:45:17
 */
@Slf4j
public class ThreadPool {
 
}


核心属性字段与构造函数定义:

其中workerSet这个集合用来存放我们正在运行的工作线程,Worker为我们封装的工作线程,后面会实现

    /** 任务等待队列 */
    private WorkQueue<Runnable> workQueue;
    /** 正在运行的工作线程集合 */
    private final Set<Worker> workerSet = new HashSet<>();
    /** 核心线程数 */
    private int corePoolSize;
    /** 最大等待时间(也就是线程的最大空闲时间) */
    private Long keepAliveTime;
    /** 等待时间单位 */
    private TimeUnit timeUnit;
 
    public ThreadPool(WorkQueue<Runnable> workQueue, int corePoolSize, Long keepAliveTime, TimeUnit timeUnit) {
        this.workQueue = workQueue;
        this.corePoolSize = corePoolSize;
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
    }



2.2.execute方法的实现

当用户创建了我们的线程池后,可以通过execute(task)来传入待执行的任务,这个方法是线程池比较核心的一个方法。


传入任务后,首先判断当前运行的线程是否小于我们规则的核心线程数,如果小于,那么就创建线程去执行该任务,如果大于,说明没有空闲线程了,我们需要加入任务等待队列中

public void execute(Runnable task){
        synchronized(workerSet){
            //1 判断当前运行的工作线程数是否小于核心线程数
            if(workerSet.size() <  corePoolSize){
                // 2.1 创建工作线程
                Worker worker = new Worker(task);
                // 2.2 加入运行线程集合
                workerSet.add(worker);
                // 2.3 运行线程
                worker.start();
 
            }else{
                // 2.1 尝试将任务加入阻塞队列中等待(put方法会一直堵塞等待,后面会改进)
                workQueue.put(task);
            }
        }
    }


2.3.Worker线程的实现

为了方便使用,我们封装工作线程Worker:

class Worker extends Thread{
        private Runnable task;
 
        public Worker(Runnable task) {
            this.task = task;
        }
 
        @Override
        public void run() {
            log.info("工作线程{}开始运行", Thread.currentThread());
 
            // 1。首先消费当前任务,消费完再去任务队列取,while循环实现线程复用
            while(task != null || (task = workQueue.poll(keepAliveTime, timeUnit)) != null){
                try {
                    task.run();
                }catch (Exception e){
                    throw new RuntimeException(e);
                }finally {
                    // 执行完后清除任务
                    task = null;
                }
            }
 
            // 2.跳出循环,说明取任务超过了最大等待时间,线程歇菜休息吧
            synchronized (workerSet){
                workerSet.remove(this);
            }
            log.info("线程{}超过最大空闲时间没有获取到任务,已被回收", Thread.currentThread());
        }
    }


Worker工作线程开始运行后,会进入while循环,首先消费当前任务,如果自身没有任务,就去等待队列中拿取任务,通过poll方法超时堵塞拿取,当超过keepAliveTime(最大空闲时间)没有拿到线程时,那么就会跳出循环,从工作线程集合中删除当前线程。


如上我们就完成了简易线程池对象的基本设计,但是如果任务很多,任务等待队列被装满了,后续添加任务会一直被堵塞,所以我们要引入拒绝策略,针对队列满了后来的任务进行一个特殊处理。

3.拒绝策略的设计

3.1.拒绝策略的作用


高并发的应用场景中,任务的到达速度可能会暂时超过线程池的处理能力,导致任务队列处于已满状态。这种情况下,如果仍然有新任务到来,线程池就需要采取一些策略来拒绝这些新任务,避免资源耗尽。


拒绝策略定义了线程池在任务队列已满时应当执行的操作,不同的拒绝策略会产生不同的效果。合理地设置拒绝策略,可以保证线程池在高负载情况下的稳定性,防止资源被无限制占用。


3.2.拒绝策略接口的定义

我们通过RejectPolicy接口来定义拒绝策略,它只有一个reject方法:

public interface RejectPolicy<T> {
    void reject(BlockQueue<T> queue, T task);
}


其中:


queue是当前的任务队列

task是被拒绝的任务

不同的拒绝策略需要实现该接口,并在reject方法中定义具体的拒绝操作。用户可以在创建线程池时通过传入自定义的拒绝策略实现类,从而实现自定义的任务拒绝处理方案。本次只会编写一个简单的拒绝策略做测试,后续文章会继续扩展,实现几个默认的拒绝策略。

3.3.任务等待队列新增 尝试添加任务方法

我们为任务等待队列新增一个方法,如果任务添加失败就触发拒绝策略,我们后面会把这个拒绝策略触发动作写到线程池对象中,这里先这样写方便理解

// 尝试向队列添加任务,如果队列已满就触发拒绝策略
    public void tryPut(RejectPolicy<T> rejectPolicy, T task){
        lock.lock();
        try {
            if(deque.size() == size){
                // 队列满了就触发拒绝策略
                log.info("拒绝策略触发,当前任务:{}", task);
                rejectPolicy.reject(this, task);
            }else{
                // 队列没满就将任务加入队列
                log.debug("没有空闲线程,加入任务等待队列等待");
                deque.addLast(task);
                emptyCondition.signal();
            }
        }finally {
            lock.unlock();
        }
    }



这个方法需要传入当前的拒绝策略和待添加的任务,如果队列已经满了,就会触发拒绝策略

3.4.完善execute方法

在线程池对象的excute方法中,我们通过tryPut来向等待队列中添加任务

public void execute(Runnable task){
        synchronized(workerSet){
            //1 判断当前运行的工作线程数是否小于核心线程数
            if(workerSet.size() <  corePoolSize){
                // 2.1 创建工作线程
                Worker worker = new Worker(task);
                // 2.2 加入运行线程集合
                workerSet.add(worker);
                // 2.3 运行线程
                worker.start();
 
            }else{
                // 2.1 尝试将任务加入阻塞队列中等待,如果加入失败,触发拒绝策略
                workQueue.tryPut(rejectPolicy, task);
 
            }
        }
    }

4.功能测试

以上我们就完成了一个极简版的线程池,接下来我们会做一些测试来测试线程池能否正常使用。

定义测试类,传入长度为5的等待队列,核心线程池数为2,最大空闲时间为5S,拒绝策略为直接丢弃(使用了Lamda写法)。


执行四次打印任务,按照预期应该是后两次任务会加入等待队列等待。

@Slf4j
public class MainTest {
    public static void main(String[] args) {
 
        ThreadPool threadPool = new ThreadPool(new WorkQueue<>(5), 2, 5L, TimeUnit.SECONDS,
                (queue, task) -> {
                    // 一直等
                    //queue.put(task);
                    // 调用者线程执行
                    //task.run();
                    // 直接抛出异常
                    // throw new RuntimeException("saa");
                    // 丢弃这个任务
                    log.debug("丢弃这个任务{}", task);
                });
 
       for (int i = 0; i < 4; i++) {
            threadPool.execute(() -> {
                System.out.println("执行任务------->当前执行线程为" + Thread.currentThread().toString());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
 
    }
}



执行结果如下,可以看到有两次任务加入到了等待队列中(多线程日志可能有时候存在顺序问题)

我们将执行次数改为10次,那么预期将会有三条任务被拒绝

总结

本章节基于上节的任务等待队列实现了一个简易线程池,实现了任务等待,线程复用与拒绝策略等功能,大体上来说,本次的多线程实战的手搓线程池部分就差不多完结了,后续有时间的话可能还会出一期功能完善篇,可以扩展的功能如下,小伙伴们也能在我架子上自行扩展:


定义线程工厂,通过线程工厂来创建核心线程和非核心线程

线程池会根据最大核心线程数和总线程数的情况来管理心线程和非核心线程

实现几个拒绝策略,并配置默认拒绝策略

线程池生命周期管理,提供start()、shutdown()、shutdownNow()等方法,允许用户主动控制线程池的启停。

添加任务执行回馈机制,当前版本的线程池无法获知已提交任务的执行状态和结果。比如支持Future/FutureTask,允许用户追踪任务状态、获取执行结果、取消任务等。

异常处理策略,当前对任务执行过程中的异常只是简单抛出,缺少统一的异常处理策略,可以考虑提供自定义的异常处理器接口,允许用户实现自己的异常处理逻辑,比如记录日志、任务重试等。

相关文章
|
3天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
3天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
7天前
|
Java 调度 开发者
Java并发编程:深入理解线程池
在Java的世界中,线程池是提升应用性能、实现高效并发处理的关键工具。本文将深入浅出地介绍线程池的核心概念、工作原理以及如何在实际应用中有效利用线程池来优化资源管理和任务调度。通过本文的学习,读者能够掌握线程池的基本使用技巧,并理解其背后的设计哲学。
|
7天前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
4天前
|
Java 调度 开发者
Java中的多线程基础及其应用
【9月更文挑战第13天】本文将深入探讨Java中的多线程概念,从基本理论到实际应用,带你一步步了解如何有效使用多线程来提升程序的性能。我们将通过实际代码示例,展示如何在Java中创建和管理线程,以及如何利用线程池优化资源管理。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和技巧,帮助你更好地理解和应用多线程编程。
|
8天前
|
安全 Java UED
Java并发编程:解锁多线程的潜力
在Java的世界里,并发编程如同一场精心编排的交响乐,每个线程扮演着不同的乐手,共同奏响性能与效率的和声。本文将引导你走进Java并发编程的大门,探索如何在多核处理器上优雅地舞动多线程,从而提升应用的性能和响应性。我们将从基础概念出发,逐步深入到高级技巧,让你的代码在并行处理的海洋中乘风破浪。
|
22天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
46 1
|
5天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
24 15
一个Android App最少有几个线程?实现多线程的方式有哪些?
|
7天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android应用开发中的多线程编程,涵盖基本概念、常见实现方式及最佳实践。主要内容包括主线程与工作线程的作用、多线程的多种实现方法(如 `Thread`、`HandlerThread`、`Executors` 和 Kotlin 协程),以及如何避免内存泄漏和合理使用线程池。通过有效的多线程管理,可以显著提升应用性能和用户体验。
24 10
|
14天前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。