Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义

简介: Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义


前言

上节内容回顾:

Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列

在多线程程序中,线程的创建和销毁是一个频繁且代价高昂的操作。如果每次有新任务到来都创建一个新线程,将会导致系统资源的巨大浪费。为了更高效地利用线程资源,我们需要线程池来统一管理和复用线程。


线程池可以避免频繁创建和销毁线程的开销,提高系统的响应速度。同时,线程池还能够设置线程数量上限,防止无限制创建线程导致资源耗尽。因此,在高并发场景下,线程池是必不可少的重要工具。


在前面的文章中,我们讲解了任务堵塞队列的实现,这篇文章会基于任务堵塞队列实现一个简易的线程池,在后续的文章中,还会继续对本章编写的线程池进行功能扩展和优化。

1.线程池的设计

1.1.线程池的七大参数

corePoolSize——核心线程最大数


maximumPoolSize——线程池最大线程数


keepAliveTime——空闲线程存活时间。


unit——空闲线程存活时间单位


workQueue——等待队列(也就是我们上节完成的内容)


threadFactory——线程工厂


handler——拒绝策略

在上一章节我们已经实现了workQueue等待队列(原先我们的命名为BlockQueue,本节也跟着官方定义改为WorkQueue),本节内容会继续实现corePoolSize,keepAliveTime,unit,handler等基本参数的功能落地,threadFactory线程工厂和maximumPoolSize线程池最大线程数这些内容会在我们后续文章对线程池进行扩展补充时实现。

1.2.线程池的基本原理和工作流程

本章节线程池的基本原理如下:

新任务到达时,首先判断目前正在运行的线程数是否小于核心线程数。

如果小于,就创建工作线程去执行任务

如果大于等于,说明没有空闲的工作线程,我们将任务加入等待队列

每个工作线程执行完当前线程后都会继续尝试去等待队列中获取任务

如果工作线程超过我们规定的空闲线程存活时间,就会被回收

ps:JDK官方提供的线程池的线程回收只会回收非核心线程,本章节的实现的线程池是一个简易版,为了方便理解,没有分核心线程和非核心线程,全归类为工作线程,后续文章会继续扩展,加入线程工厂,核心线程和非核心线程等内容~

2.线程池对象的实现

2.1.核心属性和构造函数

创建线程池对象:

我们这里首先创建我们的线程池对象(可先定义线程池接口,如JDK官方的ExecutorService接口)

/**
 * @author Luckysj @刘仕杰
 * @description 自定义线程池对象
 * @create 2024/03/27 10:45:17
 */
@Slf4j
public class ThreadPool {
 
}


核心属性字段与构造函数定义:

其中workerSet这个集合用来存放我们正在运行的工作线程,Worker为我们封装的工作线程,后面会实现

    /** 任务等待队列 */
    private WorkQueue<Runnable> workQueue;
    /** 正在运行的工作线程集合 */
    private final Set<Worker> workerSet = new HashSet<>();
    /** 核心线程数 */
    private int corePoolSize;
    /** 最大等待时间(也就是线程的最大空闲时间) */
    private Long keepAliveTime;
    /** 等待时间单位 */
    private TimeUnit timeUnit;
 
    public ThreadPool(WorkQueue<Runnable> workQueue, int corePoolSize, Long keepAliveTime, TimeUnit timeUnit) {
        this.workQueue = workQueue;
        this.corePoolSize = corePoolSize;
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
    }



2.2.execute方法的实现

当用户创建了我们的线程池后,可以通过execute(task)来传入待执行的任务,这个方法是线程池比较核心的一个方法。


传入任务后,首先判断当前运行的线程是否小于我们规则的核心线程数,如果小于,那么就创建线程去执行该任务,如果大于,说明没有空闲线程了,我们需要加入任务等待队列中

public void execute(Runnable task){
        synchronized(workerSet){
            //1 判断当前运行的工作线程数是否小于核心线程数
            if(workerSet.size() <  corePoolSize){
                // 2.1 创建工作线程
                Worker worker = new Worker(task);
                // 2.2 加入运行线程集合
                workerSet.add(worker);
                // 2.3 运行线程
                worker.start();
 
            }else{
                // 2.1 尝试将任务加入阻塞队列中等待(put方法会一直堵塞等待,后面会改进)
                workQueue.put(task);
            }
        }
    }


2.3.Worker线程的实现

为了方便使用,我们封装工作线程Worker:

class Worker extends Thread{
        private Runnable task;
 
        public Worker(Runnable task) {
            this.task = task;
        }
 
        @Override
        public void run() {
            log.info("工作线程{}开始运行", Thread.currentThread());
 
            // 1。首先消费当前任务,消费完再去任务队列取,while循环实现线程复用
            while(task != null || (task = workQueue.poll(keepAliveTime, timeUnit)) != null){
                try {
                    task.run();
                }catch (Exception e){
                    throw new RuntimeException(e);
                }finally {
                    // 执行完后清除任务
                    task = null;
                }
            }
 
            // 2.跳出循环,说明取任务超过了最大等待时间,线程歇菜休息吧
            synchronized (workerSet){
                workerSet.remove(this);
            }
            log.info("线程{}超过最大空闲时间没有获取到任务,已被回收", Thread.currentThread());
        }
    }


Worker工作线程开始运行后,会进入while循环,首先消费当前任务,如果自身没有任务,就去等待队列中拿取任务,通过poll方法超时堵塞拿取,当超过keepAliveTime(最大空闲时间)没有拿到线程时,那么就会跳出循环,从工作线程集合中删除当前线程。


如上我们就完成了简易线程池对象的基本设计,但是如果任务很多,任务等待队列被装满了,后续添加任务会一直被堵塞,所以我们要引入拒绝策略,针对队列满了后来的任务进行一个特殊处理。

3.拒绝策略的设计

3.1.拒绝策略的作用


高并发的应用场景中,任务的到达速度可能会暂时超过线程池的处理能力,导致任务队列处于已满状态。这种情况下,如果仍然有新任务到来,线程池就需要采取一些策略来拒绝这些新任务,避免资源耗尽。


拒绝策略定义了线程池在任务队列已满时应当执行的操作,不同的拒绝策略会产生不同的效果。合理地设置拒绝策略,可以保证线程池在高负载情况下的稳定性,防止资源被无限制占用。


3.2.拒绝策略接口的定义

我们通过RejectPolicy接口来定义拒绝策略,它只有一个reject方法:

public interface RejectPolicy<T> {
    void reject(BlockQueue<T> queue, T task);
}


其中:


queue是当前的任务队列

task是被拒绝的任务

不同的拒绝策略需要实现该接口,并在reject方法中定义具体的拒绝操作。用户可以在创建线程池时通过传入自定义的拒绝策略实现类,从而实现自定义的任务拒绝处理方案。本次只会编写一个简单的拒绝策略做测试,后续文章会继续扩展,实现几个默认的拒绝策略。

3.3.任务等待队列新增 尝试添加任务方法

我们为任务等待队列新增一个方法,如果任务添加失败就触发拒绝策略,我们后面会把这个拒绝策略触发动作写到线程池对象中,这里先这样写方便理解

// 尝试向队列添加任务,如果队列已满就触发拒绝策略
    public void tryPut(RejectPolicy<T> rejectPolicy, T task){
        lock.lock();
        try {
            if(deque.size() == size){
                // 队列满了就触发拒绝策略
                log.info("拒绝策略触发,当前任务:{}", task);
                rejectPolicy.reject(this, task);
            }else{
                // 队列没满就将任务加入队列
                log.debug("没有空闲线程,加入任务等待队列等待");
                deque.addLast(task);
                emptyCondition.signal();
            }
        }finally {
            lock.unlock();
        }
    }



这个方法需要传入当前的拒绝策略和待添加的任务,如果队列已经满了,就会触发拒绝策略

3.4.完善execute方法

在线程池对象的excute方法中,我们通过tryPut来向等待队列中添加任务

public void execute(Runnable task){
        synchronized(workerSet){
            //1 判断当前运行的工作线程数是否小于核心线程数
            if(workerSet.size() <  corePoolSize){
                // 2.1 创建工作线程
                Worker worker = new Worker(task);
                // 2.2 加入运行线程集合
                workerSet.add(worker);
                // 2.3 运行线程
                worker.start();
 
            }else{
                // 2.1 尝试将任务加入阻塞队列中等待,如果加入失败,触发拒绝策略
                workQueue.tryPut(rejectPolicy, task);
 
            }
        }
    }

4.功能测试

以上我们就完成了一个极简版的线程池,接下来我们会做一些测试来测试线程池能否正常使用。

定义测试类,传入长度为5的等待队列,核心线程池数为2,最大空闲时间为5S,拒绝策略为直接丢弃(使用了Lamda写法)。


执行四次打印任务,按照预期应该是后两次任务会加入等待队列等待。

@Slf4j
public class MainTest {
    public static void main(String[] args) {
 
        ThreadPool threadPool = new ThreadPool(new WorkQueue<>(5), 2, 5L, TimeUnit.SECONDS,
                (queue, task) -> {
                    // 一直等
                    //queue.put(task);
                    // 调用者线程执行
                    //task.run();
                    // 直接抛出异常
                    // throw new RuntimeException("saa");
                    // 丢弃这个任务
                    log.debug("丢弃这个任务{}", task);
                });
 
       for (int i = 0; i < 4; i++) {
            threadPool.execute(() -> {
                System.out.println("执行任务------->当前执行线程为" + Thread.currentThread().toString());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
 
    }
}



执行结果如下,可以看到有两次任务加入到了等待队列中(多线程日志可能有时候存在顺序问题)

我们将执行次数改为10次,那么预期将会有三条任务被拒绝

总结

本章节基于上节的任务等待队列实现了一个简易线程池,实现了任务等待,线程复用与拒绝策略等功能,大体上来说,本次的多线程实战的手搓线程池部分就差不多完结了,后续有时间的话可能还会出一期功能完善篇,可以扩展的功能如下,小伙伴们也能在我架子上自行扩展:


定义线程工厂,通过线程工厂来创建核心线程和非核心线程

线程池会根据最大核心线程数和总线程数的情况来管理心线程和非核心线程

实现几个拒绝策略,并配置默认拒绝策略

线程池生命周期管理,提供start()、shutdown()、shutdownNow()等方法,允许用户主动控制线程池的启停。

添加任务执行回馈机制,当前版本的线程池无法获知已提交任务的执行状态和结果。比如支持Future/FutureTask,允许用户追踪任务状态、获取执行结果、取消任务等。

异常处理策略,当前对任务执行过程中的异常只是简单抛出,缺少统一的异常处理策略,可以考虑提供自定义的异常处理器接口,允许用户实现自己的异常处理逻辑,比如记录日志、任务重试等。

相关文章
|
8天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
4天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
23 9
|
7天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
4天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
7天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
21 3
|
6天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
7天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
17 1
|
7天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
8天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
34 1
|
11天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####