1.前言
1.1.内容回顾
往期文章传送门:
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
Java多线程实战-从零手搓一个简易线程池(二)线程池与拒绝策略实现
在上一节我们实现了线程池内部的基本运转逻辑,池化了线程资源进行任务处理,细心的同学可以发现,我们上章没有划分核心线程与非核心线程的概念,在JDK官方的提供的线程池中,线程池中的线程从概念上分为核心线程和非核心线程,核心线程是线程池中长久存在的线程,默认不会被回收,而非核心线程在空闲时间超过设置的最大空闲时间时会被回收,当然,我们也可以通过设置一个属性来运行核心线程被回收。
1.2.本节任务
本章节的任务如下:
- 实现线程工厂
- 实现核心线程与非核心线程
2.实现思路
2.1 线程工厂实现思路
线程工厂是运用了工厂设计模式,可以帮助我们隐藏创建线程的一些细节。我们可以通过线程工厂在创建线程数时定义线程的一些属性,如线程名称、线程组等。实现线程工厂一般有以下步骤:
定义一个线程工厂接口或抽象类,提供创建新线程的方法。
实现该接口或继承该抽象类,重写创建线程的方法逻辑。
在线程池的构造函数中,传入自定义的线程工厂实例。
整体实现还是比较简单,主要就是要注意编码规范
2.2 核心线程与非核心线程实现思路
这里首先要清楚一个概念,JDK线程池源码中没有显式的区别核心线程和非核心线程,他只是线程池在处理线程池不同情况下的线程的一种概念。我们接下来从源码分析(JDK1.8)是如何实现核心线程和非核心线程的管理的。
JDK官方线程池中的runWorker方法作用是用来执行worker线程
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 线程执行任务流程,省流 } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
同我们上节运行线程一样,他会通过while (task != null || (task = getTask()) != null)来重复获取任务,如果task == null,也就是没获取到,会进入到processWorkerExit函数中,线程会被回收。也就是说,只要getTask方法返回为null,就代表了当前线程需要回收,所以我们接下来重点查看getTask方法的源码:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // 1.方法内部使用了一个无限循环for (;;),这意味着线程会一直尝试获取任务,直到成功获取到任务或者满足退出条件。 for (;;) { // 2.获取到目前线程池的线程数,最大核心线程,最大总线程数等信息 int c = ctl.get(); int rs = runStateOf(c); // 3.如果线程池的运行状态至少为SHUTDOWN(在此状态以上的状态,都不会接受新任务了,所以我们直接返回null) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c);//获取线程池当前线程数量 // 4.根据当前线程数动态判断是否要回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
getTask方法主要负责从workQueue队列中获取任务,如果获取到了就返回任务,如果没有获取到就返回null。他会根据线程池的当前状态,当前线程数,来动态的选择是否从workQueue中拿取任务,以及拿取操作是否是超时操作。这里的设计特别巧妙,建议阅读源码仔细体会
如果 当前线程数 > 最大核心线程数,我们就判定存在非核心线程,可以进行回收判断
如果 当前线程数 < 最大线程数,我们就判定不存在核心线程
所以核心线程和非核心线程他们都是一类线程,只是在线程池不同情况下划分的概念而已
3.代码实现
3.1.线程池工厂实现
3.1.1.线程工厂接口
/** * @author Luckysj @刘仕杰 * @description 线程工厂接口 * @create 2024/03/28 20:40:18 */ public interface ThreadFactory { /** * @description * @param * @return 创建的线程对象 * @date 2024/03/28 21:01:35 */ Thread newThread(Runnable r); }
3.1.2.默认线程工厂实现类
默认线程工厂实现类主要是设置新建线程的线程组,线程名前缀等等信息,更加规范,方便后续日志排查错误
/** * @author Luckysj @刘仕杰 * @description 默认线程工厂,我们这里仿照源码写法,为每个线程分配线程组(默认会自动分配),并为每个线程组 * @create 2024/03/28 21:27:10 */ public class DefaultThreadFactory implements ThreadFactory{ /** 原子序号类,我们可以通过该类为线程工厂来获取一个随机序号,主要是为了区分不同线程池实例*/ private static final AtomicInteger poolNumber = new AtomicInteger(1); /** 线程组,每个线程都需要属于一个线程组(平常使用未指定线程组会默认分配)*/ private final ThreadGroup group; /** 原子序号类,我们可以通过该类为每个线程来获取一个随机序号*/ private static final AtomicInteger threadNumber = new AtomicInteger(1); /** 线程名前缀,以便于在日志、监控等场景下识别和管理线程。*/ private final String namePrefix; public DefaultThreadFactory() { // 获取管理安全策略的类,通过这个类我们可以获取对应名称的线程组,SecurityManager 和 group 的存在是为了更好地控制线程的安全性和权限 SecurityManager s = System.getSecurityManager(); // 存在 SecurityManager实例,则通过 s.getThreadGroup() 获取一个受限制的线程组。 // 如果不存在 SecurityManager 实例,则使用当前线程所在的线程组 Thread.currentThread().getThreadGroup()。 this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); // 生成前缀 this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); // 将线程设置为用户线程 if(thread.isDaemon()){ thread.setDaemon(false); } // 为线程设置默认优先级 if(thread.getPriority() != Thread.NORM_PRIORITY){ thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
3.1.3.使用线程工厂
在Worker工作线程构造函数中使用工厂创建线程
class Worker implements Runnable{ private Runnable firstTask; private Thread thread; public Worker(Runnable task) { this.firstTask = task; this.thread = threadFactory.newThread(this); } // 省略 }
3.2核心线程与非核心线程逻辑
3.2.1.编写getTask方法
getTask方法会根据线程池情况动态从任务队列中获取任务
/** * @description 从等待队列中获取任务 * @return Runnable 待执行的任务,没有获取到会返回null * @date 2024/04/02 10:46:37 */ public Runnable getTask(){ //我们使用一个变量来记录上次循环获取任务是否超时 boolean preIsTimeOut = false; // 内部使用一个while循环,线程会一直尝试获取任务,直到成功获取到任务或者满足退出条件 while(true){ // 获取线程池当前线程数量 int wc = threadTotalNums.get(); // 1.是否要进行核心线程回收操作,当allowCoreThreadTimeOut为true,或者当前线程池数大于核心线程数时,我们需要进行回收判断 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 2.根据情况动态调整线程数,以下情况需要直接返回null(返回null就会回收线程): // (1)当前线程大于最大线程数(就是超过规定大小了),且任务队列为空且存在工作线程 // (2)timed为true,上次任务超时了(preIsTimeOut = true),且任务队列为空且存在工作 if ( (wc > maximumPoolSize || (timed && preIsTimeOut)) && (wc > 1 || workQueue.isEmpty()) ) { return null; } // 3.根据timed这个条件来选择是超时堵塞 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 获取任务超时了,将preIsTimeOut设为true,下次可以执行回收 preIsTimeOut = true; } }
timed 变量决定了线程从等待队列中拿取任务的方式,如果当前线程数大于最大核心线程数,或者开启了允许核心线程回收(allowCoreThreadTimeOut = true),我们就超时拿取,这样如果拿取任务超时就会返回null,线程就会被回收
3.2.2.调整Worker工作线程的run方法
将原来直接从任务队列中获取任务改为通过getTask方法获取
@Override public void run() { log.info("工作线程====》工作线程{}开始运行", Thread.currentThread()); // 1。首先消费当前任务,消费完再去任务队列取,while循环实现线程复用 while(firstTask != null || (firstTask = getTask()) != null){ try { firstTask.run(); }catch (Exception e){ throw new RuntimeException(e); }finally { // 执行完后清除任务 firstTask = null; } } // 2.跳出循环,说明取任务超过了最大等待时间,线程歇菜休息吧 synchronized (workerSet){ workerSet.remove(this); threadTotalNums.decrementAndGet(); //计数扣减 } log.info("工作线程====》线程{}已被回收,当前线程数:{}", Thread.currentThread(), threadTotalNums.get()); }
3.2.3.编写addWorker方法
/** * @description 添加工作线程 * @param firstTask 线程第一次执行的任务 * @param isCore 是否为核心线程 * @return Boolean 线程是否添加成功 * @date 2024/04/02 10:42:43 */ public Boolean addWorker(Runnable firstTask, Boolean isCore){ if(firstTask == null) { throw new NullPointerException(); } // TODO 1.我们在添加线程时,首先可以进行一些与线程池生命周期相关的校验,比如在一些状态下,不允许再添加任务 // 2.根据当前线程池和isCore条件判断是否需要创建 int wc = threadTotalNums.get(); if (wc >= (isCore ? corePoolSize : maximumPoolSize)) return false; // 3.创建线程,并添加到线程集合中 Worker worker = new Worker(firstTask); Thread t = worker.thread; if(t != null){ synchronized (workerSet){ workerSet.add(worker); threadTotalNums.getAndIncrement(); } t.start(); return true; } return false; }
3.2.4.完善excute方法
流程如下:
.如果当前线程数小于核心线程,直接创建核心线程去运行
2.线程数大于核心线程,我们就将任务加入等待队列
3.队列满了,尝试创建非核心线程,如果失败就触发拒绝策略
public void execute(Runnable task){ if(task == null){ throw new NullPointerException("传递的Runnable任务为Null"); } // 1.如果当前线程数小于核心线程,直接创建线程去运行 if(threadTotalNums.get() < corePoolSize){ if(addWorker(task, true)) return; } // 2.线程数大于核心线程,我们就将任务加入等待队列 if(workQueue.offer(task)){ return; } // 3.队列满了,尝试创建非核心线程,如果失败就触发拒绝策略 else if(!addWorker(task, false)){ reject(task); } }
4.测试
编写如下测试代码,我们会创建一个核心线程数为2,最大线程数为5,等待队列长度为5的线程池,并添加15个任务到线程池中,按照预期会有五个任务触发拒绝策略,在任务执行完成后只保留两个核心线程
@Slf4j public class MainTest { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(new WorkQueue<>(5), 2, 5,5L, TimeUnit.SECONDS, (queue, task) -> { log.info("拒绝策略====》拒绝策略触发,直接丢弃当前任务"); }, new DefaultThreadFactory()); threadPool.setAllowCoreThreadTimeOut(false); //不回收核心线程 for (int i = 0; i < 15; i++) { threadPool.execute(() -> { System.out.println("执行任务------->当前执行线程为" + Thread.currentThread().toString()); try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); } // ExecutorService executorService = Executors.newFixedThreadPool(2); } }
运行结果如下:
可以看到运行结果符合预期,任务也被正常消费
我们设置AllowCoreThreadTimeOut的属性为true,再次进行测试,
threadPool.setAllowCoreThreadTimeOut(true); //回收核心线程
结果输出:
可以看到,核心线程也会被回收,符合预期。
5.总结
在本章节中我们通过学习JDK线程池源码中的部分代码,实现了一个简易版带有核心线程与非核心线程处理逻辑的线程池,我们可以通过指定AllowCoreThreadTimeOut属性来设置是否允许核心线程的回收,默认只会回收非核心线程。线程池的官方源码还是写得相当巧妙的,阅读难度也不高,推荐小伙伴学习~