前言
上节内容回顾:
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
在多线程程序中,线程的创建和销毁是一个频繁且代价高昂的操作。如果每次有新任务到来都创建一个新线程,将会导致系统资源的巨大浪费。为了更高效地利用线程资源,我们需要线程池来统一管理和复用线程。
线程池可以避免频繁创建和销毁线程的开销,提高系统的响应速度。同时,线程池还能够设置线程数量上限,防止无限制创建线程导致资源耗尽。因此,在高并发场景下,线程池是必不可少的重要工具。
在前面的文章中,我们讲解了任务堵塞队列的实现,这篇文章会基于任务堵塞队列实现一个简易的线程池,在后续的文章中,还会继续对本章编写的线程池进行功能扩展和优化。
1.线程池的设计
1.1.线程池的七大参数
corePoolSize——核心线程最大数
maximumPoolSize——线程池最大线程数
keepAliveTime——空闲线程存活时间。
unit——空闲线程存活时间单位
workQueue——等待队列(也就是我们上节完成的内容)
threadFactory——线程工厂
handler——拒绝策略
在上一章节我们已经实现了workQueue等待队列(原先我们的命名为BlockQueue,本节也跟着官方定义改为WorkQueue),本节内容会继续实现corePoolSize,keepAliveTime,unit,handler等基本参数的功能落地,threadFactory线程工厂和maximumPoolSize线程池最大线程数这些内容会在我们后续文章对线程池进行扩展补充时实现。
1.2.线程池的基本原理和工作流程
本章节线程池的基本原理如下:
新任务到达时,首先判断目前正在运行的线程数是否小于核心线程数。
如果小于,就创建工作线程去执行任务
如果大于等于,说明没有空闲的工作线程,我们将任务加入等待队列
每个工作线程执行完当前线程后都会继续尝试去等待队列中获取任务
如果工作线程超过我们规定的空闲线程存活时间,就会被回收
ps:JDK官方提供的线程池的线程回收只会回收非核心线程,本章节的实现的线程池是一个简易版,为了方便理解,没有分核心线程和非核心线程,全归类为工作线程,后续文章会继续扩展,加入线程工厂,核心线程和非核心线程等内容~
2.线程池对象的实现
2.1.核心属性和构造函数
创建线程池对象:
我们这里首先创建我们的线程池对象(可先定义线程池接口,如JDK官方的ExecutorService接口)
/** * @author Luckysj @刘仕杰 * @description 自定义线程池对象 * @create 2024/03/27 10:45:17 */ @Slf4j public class ThreadPool { }
核心属性字段与构造函数定义:
其中workerSet这个集合用来存放我们正在运行的工作线程,Worker为我们封装的工作线程,后面会实现
/** 任务等待队列 */ private WorkQueue<Runnable> workQueue; /** 正在运行的工作线程集合 */ private final Set<Worker> workerSet = new HashSet<>(); /** 核心线程数 */ private int corePoolSize; /** 最大等待时间(也就是线程的最大空闲时间) */ private Long keepAliveTime; /** 等待时间单位 */ private TimeUnit timeUnit; public ThreadPool(WorkQueue<Runnable> workQueue, int corePoolSize, Long keepAliveTime, TimeUnit timeUnit) { this.workQueue = workQueue; this.corePoolSize = corePoolSize; this.keepAliveTime = keepAliveTime; this.timeUnit = timeUnit; }
2.2.execute方法的实现
当用户创建了我们的线程池后,可以通过execute(task)来传入待执行的任务,这个方法是线程池比较核心的一个方法。
传入任务后,首先判断当前运行的线程是否小于我们规则的核心线程数,如果小于,那么就创建线程去执行该任务,如果大于,说明没有空闲线程了,我们需要加入任务等待队列中
public void execute(Runnable task){ synchronized(workerSet){ //1 判断当前运行的工作线程数是否小于核心线程数 if(workerSet.size() < corePoolSize){ // 2.1 创建工作线程 Worker worker = new Worker(task); // 2.2 加入运行线程集合 workerSet.add(worker); // 2.3 运行线程 worker.start(); }else{ // 2.1 尝试将任务加入阻塞队列中等待(put方法会一直堵塞等待,后面会改进) workQueue.put(task); } } }
2.3.Worker线程的实现
为了方便使用,我们封装工作线程Worker:
class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { log.info("工作线程{}开始运行", Thread.currentThread()); // 1。首先消费当前任务,消费完再去任务队列取,while循环实现线程复用 while(task != null || (task = workQueue.poll(keepAliveTime, timeUnit)) != null){ try { task.run(); }catch (Exception e){ throw new RuntimeException(e); }finally { // 执行完后清除任务 task = null; } } // 2.跳出循环,说明取任务超过了最大等待时间,线程歇菜休息吧 synchronized (workerSet){ workerSet.remove(this); } log.info("线程{}超过最大空闲时间没有获取到任务,已被回收", Thread.currentThread()); } }
Worker工作线程开始运行后,会进入while循环,首先消费当前任务,如果自身没有任务,就去等待队列中拿取任务,通过poll方法超时堵塞拿取,当超过keepAliveTime(最大空闲时间)没有拿到线程时,那么就会跳出循环,从工作线程集合中删除当前线程。
如上我们就完成了简易线程池对象的基本设计,但是如果任务很多,任务等待队列被装满了,后续添加任务会一直被堵塞,所以我们要引入拒绝策略,针对队列满了后来的任务进行一个特殊处理。
3.拒绝策略的设计
3.1.拒绝策略的作用
高并发的应用场景中,任务的到达速度可能会暂时超过线程池的处理能力,导致任务队列处于已满状态。这种情况下,如果仍然有新任务到来,线程池就需要采取一些策略来拒绝这些新任务,避免资源耗尽。
拒绝策略定义了线程池在任务队列已满时应当执行的操作,不同的拒绝策略会产生不同的效果。合理地设置拒绝策略,可以保证线程池在高负载情况下的稳定性,防止资源被无限制占用。
3.2.拒绝策略接口的定义
我们通过RejectPolicy接口来定义拒绝策略,它只有一个reject方法:
public interface RejectPolicy<T> { void reject(BlockQueue<T> queue, T task); }
其中:
queue是当前的任务队列
task是被拒绝的任务
不同的拒绝策略需要实现该接口,并在reject方法中定义具体的拒绝操作。用户可以在创建线程池时通过传入自定义的拒绝策略实现类,从而实现自定义的任务拒绝处理方案。本次只会编写一个简单的拒绝策略做测试,后续文章会继续扩展,实现几个默认的拒绝策略。
3.3.任务等待队列新增 尝试添加任务方法
我们为任务等待队列新增一个方法,如果任务添加失败就触发拒绝策略,我们后面会把这个拒绝策略触发动作写到线程池对象中,这里先这样写方便理解
// 尝试向队列添加任务,如果队列已满就触发拒绝策略 public void tryPut(RejectPolicy<T> rejectPolicy, T task){ lock.lock(); try { if(deque.size() == size){ // 队列满了就触发拒绝策略 log.info("拒绝策略触发,当前任务:{}", task); rejectPolicy.reject(this, task); }else{ // 队列没满就将任务加入队列 log.debug("没有空闲线程,加入任务等待队列等待"); deque.addLast(task); emptyCondition.signal(); } }finally { lock.unlock(); } }
这个方法需要传入当前的拒绝策略和待添加的任务,如果队列已经满了,就会触发拒绝策略
3.4.完善execute方法
在线程池对象的excute方法中,我们通过tryPut来向等待队列中添加任务
public void execute(Runnable task){ synchronized(workerSet){ //1 判断当前运行的工作线程数是否小于核心线程数 if(workerSet.size() < corePoolSize){ // 2.1 创建工作线程 Worker worker = new Worker(task); // 2.2 加入运行线程集合 workerSet.add(worker); // 2.3 运行线程 worker.start(); }else{ // 2.1 尝试将任务加入阻塞队列中等待,如果加入失败,触发拒绝策略 workQueue.tryPut(rejectPolicy, task); } } }
4.功能测试
以上我们就完成了一个极简版的线程池,接下来我们会做一些测试来测试线程池能否正常使用。
定义测试类,传入长度为5的等待队列,核心线程池数为2,最大空闲时间为5S,拒绝策略为直接丢弃(使用了Lamda写法)。
执行四次打印任务,按照预期应该是后两次任务会加入等待队列等待。
@Slf4j public class MainTest { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(new WorkQueue<>(5), 2, 5L, TimeUnit.SECONDS, (queue, task) -> { // 一直等 //queue.put(task); // 调用者线程执行 //task.run(); // 直接抛出异常 // throw new RuntimeException("saa"); // 丢弃这个任务 log.debug("丢弃这个任务{}", task); }); for (int i = 0; i < 4; i++) { threadPool.execute(() -> { System.out.println("执行任务------->当前执行线程为" + Thread.currentThread().toString()); try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); } } }
执行结果如下,可以看到有两次任务加入到了等待队列中(多线程日志可能有时候存在顺序问题)
我们将执行次数改为10次,那么预期将会有三条任务被拒绝
总结
本章节基于上节的任务等待队列实现了一个简易线程池,实现了任务等待,线程复用与拒绝策略等功能,大体上来说,本次的多线程实战的手搓线程池部分就差不多完结了,后续有时间的话可能还会出一期功能完善篇,可以扩展的功能如下,小伙伴们也能在我架子上自行扩展:
定义线程工厂,通过线程工厂来创建核心线程和非核心线程
线程池会根据最大核心线程数和总线程数的情况来管理心线程和非核心线程
实现几个拒绝策略,并配置默认拒绝策略
线程池生命周期管理,提供start()、shutdown()、shutdownNow()等方法,允许用户主动控制线程池的启停。
添加任务执行回馈机制,当前版本的线程池无法获知已提交任务的执行状态和结果。比如支持Future/FutureTask,允许用户追踪任务状态、获取执行结果、取消任务等。
异常处理策略,当前对任务执行过程中的异常只是简单抛出,缺少统一的异常处理策略,可以考虑提供自定义的异常处理器接口,允许用户实现自己的异常处理逻辑,比如记录日志、任务重试等。