【多线程:线程池】自定义线程池
01.介绍
建议:学习本文章前最好对生产者消费者模式熟悉,可以看我之前的文章https://blog.csdn.net/m0_71229547/article/details/125435005
这个图就是自定义线程池的实现结构,它是用生产者消费者模式 实现的,它有三个部分 Thread Pool、Blocking Queue、main 组成。
Thread Pool是线程池 它的主要作用是 进行线程的创建与任务的执行 以及 把超过线程池部分的任务交给主线程进行拒绝策略处理,我们可以把线程池中的线程当做消费者。
main是任务的生产者,主要作用是 任务的生产、线程池的创建、规定拒绝策略、执行拒绝策略
Blocking Queue是阻塞队列,包括两个功能 生产者生产任务 放入任务队列、消费者获取任务 从任务队列中取出。当任务数大于线程数时 把任务放入到任务队列 假如任务队列也放满了 则wait生产者生产 然后如果之后消费者进行了消费 则唤醒生产者 然后执行生产。当任务数为空时 消费者会持续获取任务队列里的任务 具体表现是 wait消费者线程 当有任务时唤醒 然后执行消费。
02.具体实现与解释
实现
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,
1000, TimeUnit.MILLISECONDS, 2, (queue, task)->{
// 1. 死等
queue.put(task);
// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
// task.run();
});
for (int i = 0; i < 4; i++) {
int j = i;
System.out.println(i);
threadPool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
});
}
}
}
@FunctionalInterface // 拒绝策略,函数式接口
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
// 统一时间格式
private TimeUnit timeUnit;
// 拒接策略
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker{}, {}", worker, task);
workers.add(worker);
worker.start();
} else {
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// 这种处理是如果没有任务我们就死等,也可以理解为 这个线程始终在线程池内,等待任务的出现
// while(task != null || (task = taskQueue.take()) != null) {
// 这种处理方式是如果在规定时间内没有任务,我们就任务现在没有任务了 然后 移除线程,
// 也可以理解为在规定时间内这个线程在线程池中 超过一定时间我们就这个线程从线程池中移除
while(task != null || (task = taskQueue.pull(timeout, timeUnit)) != null) {
try {
log.debug("正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时阻塞获取
public T pull(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转换为 纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回值是剩余时间
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满
if(queue.size() == capcity) {
rejectPolicy.reject(this, task);
} else { // 有空闲
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
解释
可以看出来代码量还是很大的,我们逐个分析每个部分。
BlockingQueue
这个类的阻塞队列类,比较重要的是put方法与take方法。
put方法的主要作用是生产者线程生产的任务大于了线程池容量 然后生产者线程把多出的任务通过put方法放到了任务队列里,如果任务超出了任务队列的大小 则wait生产者线程 不再生产 直到有消费者线程执行了任务队列的任务 此时唤醒生产者线程继续生产。
take方法的主要作用是如果此时线程池中的消费者线程执行完了自己的任务 然后就会通过take方法从任务队列中获取任务 并且从任务队列中移除 如果此时获取不到任务 说明任务队列为空 然后wait当前这个消费者线程 直到任务队列重新有任务 唤醒消费者线程继续消费任务。
put方法与offer方法的对比:put方法是如果任务队列已经满了 我们就生产线程还会生产一个任务 只不过这个任务陷入了等待 直到任务队列不满时才会重新执行。offer方法是有时限的添加任务 如果此时任务队列满了 我们生产线程还会生产一个任务 只不过这个任务会进入有时限的等待 如果超过这个时间我们就放弃这个任务 转而执行下一个任务。
take方法与pull方法:take方法是如果任务队列为空 则此消费线程此时获取不到任务 然后陷入等待 直到任务队列不为空 然后获取到这个任务并把这个任务从任务队列中移除,可以理解为这个线程一直在线程池内等待新的任务的到来。pull方法是如果任务队列为空 此时消费线程此时获取不到任务 然后会进入有时限的等待 如果超过这个时限还没有获取到任务 则把这个线程从线程池中移除。
ThreadPool
这个类是线程池类,需要关注execute方法与有参构造
有参构造:参数是线程池需要的配置,有coreSize 线程核心数(线程池大小)、queueCapcity 任务队列大小 用来创建任务队列时传参、timeout take方法的超时时间、timeUnit 统一时间配置、rejectPolicy拒绝策略
execute方法是执行任务的方法,线程数没有超过核心数时:我们 创建线程 把线程加入线程集合 并启动线程执行run方法 run方法通过take/pull方法从任务队列里获取任务 如果获取到任务就执行任务 如果没有获取到任务就等待。线程数超过核心数时:说明我们的任务数量已经大于线程数了 此时又因为我们的线程池已经满了 所以转而由生产线程执行拒绝策略。
RejectPolicy
拒绝策略函数式接口,需要关注它的实现,它的作用是当任务数大于核心数时 此时消费线程不能再执行多出的任务,此时生产线程执行拒绝策略 具体讨论这部分的任务应该怎么处理。
实现:拒绝策略是在主线程创建线程池对象时写的,在代码中我们也能看到 它分为五种处理策略,==死等==:可以看出它执行的是put方法 也就是多出的任务放入任务队列 直到任务队列满 不过如果此时仍然有多出任务 就会一直等待任务队列到不满时 然后等待的任务加入任务队列。==带超时等待==:也就是执行offer方法 和put方法基本一样 只不过在任务队列满了后 进入有时限的等待 如果超过时间则放弃这个任务 执行下一个任务。==让调用者放弃任务执行==:也就是如果任务数超过核心数 且任务队列已满,超出部分的任务生产者直接放弃。==让调用者抛出异常==:如果任务数超过核心数 且任务队列已满,生产者直接抛出异常。==让调用者自己执行任务==:如果任务数超过核心数 且任务队列已满,生产者自己把这个任务执行了。
03.不同情况下线程池的执行情况
我们以下结果的拒绝策略都为死等,消费线程获取任务队列用的是pull方法
核心数+任务队列=任务的数量
我们把核心数设置为2 任务队列设置为2 任务数量设置为4
结果
解释
上来先新增两个任务0 1,然后直接被Thread0 Thread1执行了,多出的两个任务2 3我们放入了任务队列 也就是图片里的加入任务队列,1s后 线程池的两个线程执行任务完毕 从线程池中获取新的任务,所以此时2 3任务被从任务队列中取出 然后Thread0 Thread1执行 2 3任务,最后 因为我们用的是pull方法有时限获取在1s后没有获取到任务 所以把Thread0 Thread1移除线程池
核心数+任务队列<任务的数量
我们把核心数设置为2 任务队列设置为2 任务数量设置为6
结果
任务数>线程数+任务队列,又因为我们每一个任务都要执行1s,所以势必会有任务等待加入任务队列,我们看上图,可以发现0 1任务被执行 2 3任务进入任务队列 4任务等待加入任务队列,因为此时生产者线程陷入等待 所以任务5就没有被生产出来,当0 1任务执行完后,Thread0 线程执行任务2 任务4加入任务队列 此时任务队列又满了 所以任务5此时等待加入任务队列,然后Thread1执行任务3 然后任务5加入任务队列,最后任务5 任务4被执行,没有任务后 两个消费线程被移除线程池。