【多线程:线程池】自定义线程池

简介: 【多线程:线程池】自定义线程池

【多线程:线程池】自定义线程池

01.介绍

建议:学习本文章前最好对生产者消费者模式熟悉,可以看我之前的文章https://blog.csdn.net/m0_71229547/article/details/125435005

这个图就是自定义线程池的实现结构,它是用生产者消费者模式 实现的,它有三个部分 Thread Pool、Blocking Queue、main 组成。
Thread Pool是线程池 它的主要作用是 进行线程的创建与任务的执行 以及 把超过线程池部分的任务交给主线程进行拒绝策略处理,我们可以把线程池中的线程当做消费者。
main是任务的生产者,主要作用是 任务的生产、线程池的创建、规定拒绝策略、执行拒绝策略
Blocking Queue是阻塞队列,包括两个功能 生产者生产任务 放入任务队列、消费者获取任务 从任务队列中取出。当任务数大于线程数时 把任务放入到任务队列 假如任务队列也放满了 则wait生产者生产 然后如果之后消费者进行了消费 则唤醒生产者 然后执行生产。当任务数为空时 消费者会持续获取任务队列里的任务 具体表现是 wait消费者线程 当有任务时唤醒 然后执行消费。

02.具体实现与解释

实现

@Slf4j(topic = "c.TestPool")
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,
                1000, TimeUnit.MILLISECONDS, 2, (queue, task)->{
            // 1. 死等
            queue.put(task);
            // 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);
            // 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);
            // 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);
            // 5) 让调用者自己执行任务
//            task.run();
        });
        for (int i = 0; i < 4; i++) {
            int j = i;
            System.out.println(i);
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

@FunctionalInterface // 拒绝策略,函数式接口
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务时的超时时间
    private long timeout;

    // 统一时间格式
    private TimeUnit timeUnit;

    // 拒接策略
    private RejectPolicy<Runnable> rejectPolicy;

    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增 worker{}, {}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行

            // 这种处理是如果没有任务我们就死等,也可以理解为 这个线程始终在线程池内,等待任务的出现
//          while(task != null || (task = taskQueue.take()) != null) {

            // 这种处理方式是如果在规定时间内没有任务,我们就任务现在没有任务了 然后 移除线程,
            // 也可以理解为在规定时间内这个线程在线程池中 超过一定时间我们就这个线程从线程池中移除
            while(task != null || (task = taskQueue.pull(timeout, timeUnit)) != null) {

                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }
}


@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();

    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时阻塞获取
    public T pull(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列 {} ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if(nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if(queue.size() == capcity) {
                rejectPolicy.reject(this, task);
            } else {  // 有空闲
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

解释

可以看出来代码量还是很大的,我们逐个分析每个部分。

BlockingQueue

这个类的阻塞队列类,比较重要的是put方法与take方法。
put方法的主要作用是生产者线程生产的任务大于了线程池容量 然后生产者线程把多出的任务通过put方法放到了任务队列里,如果任务超出了任务队列的大小 则wait生产者线程 不再生产 直到有消费者线程执行了任务队列的任务 此时唤醒生产者线程继续生产。
take方法的主要作用是如果此时线程池中的消费者线程执行完了自己的任务 然后就会通过take方法从任务队列中获取任务 并且从任务队列中移除 如果此时获取不到任务 说明任务队列为空 然后wait当前这个消费者线程 直到任务队列重新有任务 唤醒消费者线程继续消费任务。
put方法与offer方法的对比:put方法是如果任务队列已经满了 我们就生产线程还会生产一个任务 只不过这个任务陷入了等待 直到任务队列不满时才会重新执行。offer方法是有时限的添加任务 如果此时任务队列满了 我们生产线程还会生产一个任务 只不过这个任务会进入有时限的等待 如果超过这个时间我们就放弃这个任务 转而执行下一个任务。
take方法与pull方法:take方法是如果任务队列为空 则此消费线程此时获取不到任务 然后陷入等待 直到任务队列不为空 然后获取到这个任务并把这个任务从任务队列中移除,可以理解为这个线程一直在线程池内等待新的任务的到来。pull方法是如果任务队列为空 此时消费线程此时获取不到任务 然后会进入有时限的等待 如果超过这个时限还没有获取到任务 则把这个线程从线程池中移除。

ThreadPool

这个类是线程池类,需要关注execute方法与有参构造
有参构造:参数是线程池需要的配置,有coreSize 线程核心数(线程池大小)、queueCapcity 任务队列大小 用来创建任务队列时传参、timeout take方法的超时时间、timeUnit 统一时间配置、rejectPolicy拒绝策略
execute方法是执行任务的方法,线程数没有超过核心数时:我们 创建线程 把线程加入线程集合 并启动线程执行run方法 run方法通过take/pull方法从任务队列里获取任务 如果获取到任务就执行任务 如果没有获取到任务就等待。线程数超过核心数时:说明我们的任务数量已经大于线程数了 此时又因为我们的线程池已经满了 所以转而由生产线程执行拒绝策略。

RejectPolicy

拒绝策略函数式接口,需要关注它的实现,它的作用是当任务数大于核心数时 此时消费线程不能再执行多出的任务,此时生产线程执行拒绝策略 具体讨论这部分的任务应该怎么处理。
实现:拒绝策略是在主线程创建线程池对象时写的,在代码中我们也能看到 它分为五种处理策略,==死等==:可以看出它执行的是put方法 也就是多出的任务放入任务队列 直到任务队列满 不过如果此时仍然有多出任务 就会一直等待任务队列到不满时 然后等待的任务加入任务队列。==带超时等待==:也就是执行offer方法 和put方法基本一样 只不过在任务队列满了后 进入有时限的等待 如果超过时间则放弃这个任务 执行下一个任务。==让调用者放弃任务执行==:也就是如果任务数超过核心数 且任务队列已满,超出部分的任务生产者直接放弃。==让调用者抛出异常==:如果任务数超过核心数 且任务队列已满,生产者直接抛出异常。==让调用者自己执行任务==:如果任务数超过核心数 且任务队列已满,生产者自己把这个任务执行了。

03.不同情况下线程池的执行情况

我们以下结果的拒绝策略都为死等,消费线程获取任务队列用的是pull方法

核心数+任务队列=任务的数量

我们把核心数设置为2 任务队列设置为2 任务数量设置为4

结果

解释
上来先新增两个任务0 1,然后直接被Thread0 Thread1执行了,多出的两个任务2 3我们放入了任务队列 也就是图片里的加入任务队列,1s后 线程池的两个线程执行任务完毕 从线程池中获取新的任务,所以此时2 3任务被从任务队列中取出 然后Thread0 Thread1执行 2 3任务,最后 因为我们用的是pull方法有时限获取在1s后没有获取到任务 所以把Thread0 Thread1移除线程池

核心数+任务队列<任务的数量

我们把核心数设置为2 任务队列设置为2 任务数量设置为6
结果

任务数>线程数+任务队列,又因为我们每一个任务都要执行1s,所以势必会有任务等待加入任务队列,我们看上图,可以发现0 1任务被执行 2 3任务进入任务队列 4任务等待加入任务队列,因为此时生产者线程陷入等待 所以任务5就没有被生产出来,当0 1任务执行完后,Thread0 线程执行任务2 任务4加入任务队列 此时任务队列又满了 所以任务5此时等待加入任务队列,然后Thread1执行任务3 然后任务5加入任务队列,最后任务5 任务4被执行,没有任务后 两个消费线程被移除线程池。

目录
相关文章
|
11天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
88 38
|
9天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
34 2
|
11天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
42 4
|
11天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
69 2
|
14天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
12 2
|
14天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
27 2
|
29天前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
38 1
C++ 多线程之初识多线程
|
14天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
13 3
|
14天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
26 1
|
14天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
26 1