【多线程:线程池】自定义线程池

简介: 【多线程:线程池】自定义线程池

【多线程:线程池】自定义线程池

01.介绍

建议:学习本文章前最好对生产者消费者模式熟悉,可以看我之前的文章https://blog.csdn.net/m0_71229547/article/details/125435005

这个图就是自定义线程池的实现结构,它是用生产者消费者模式 实现的,它有三个部分 Thread Pool、Blocking Queue、main 组成。
Thread Pool是线程池 它的主要作用是 进行线程的创建与任务的执行 以及 把超过线程池部分的任务交给主线程进行拒绝策略处理,我们可以把线程池中的线程当做消费者。
main是任务的生产者,主要作用是 任务的生产、线程池的创建、规定拒绝策略、执行拒绝策略
Blocking Queue是阻塞队列,包括两个功能 生产者生产任务 放入任务队列、消费者获取任务 从任务队列中取出。当任务数大于线程数时 把任务放入到任务队列 假如任务队列也放满了 则wait生产者生产 然后如果之后消费者进行了消费 则唤醒生产者 然后执行生产。当任务数为空时 消费者会持续获取任务队列里的任务 具体表现是 wait消费者线程 当有任务时唤醒 然后执行消费。

02.具体实现与解释

实现

@Slf4j(topic = "c.TestPool")
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,
                1000, TimeUnit.MILLISECONDS, 2, (queue, task)->{
            // 1. 死等
            queue.put(task);
            // 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);
            // 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);
            // 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);
            // 5) 让调用者自己执行任务
//            task.run();
        });
        for (int i = 0; i < 4; i++) {
            int j = i;
            System.out.println(i);
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

@FunctionalInterface // 拒绝策略,函数式接口
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务时的超时时间
    private long timeout;

    // 统一时间格式
    private TimeUnit timeUnit;

    // 拒接策略
    private RejectPolicy<Runnable> rejectPolicy;

    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增 worker{}, {}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行

            // 这种处理是如果没有任务我们就死等,也可以理解为 这个线程始终在线程池内,等待任务的出现
//          while(task != null || (task = taskQueue.take()) != null) {

            // 这种处理方式是如果在规定时间内没有任务,我们就任务现在没有任务了 然后 移除线程,
            // 也可以理解为在规定时间内这个线程在线程池中 超过一定时间我们就这个线程从线程池中移除
            while(task != null || (task = taskQueue.pull(timeout, timeUnit)) != null) {

                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }
}


@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();

    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时阻塞获取
    public T pull(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列 {} ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if(nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if(queue.size() == capcity) {
                rejectPolicy.reject(this, task);
            } else {  // 有空闲
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

解释

可以看出来代码量还是很大的,我们逐个分析每个部分。

BlockingQueue

这个类的阻塞队列类,比较重要的是put方法与take方法。
put方法的主要作用是生产者线程生产的任务大于了线程池容量 然后生产者线程把多出的任务通过put方法放到了任务队列里,如果任务超出了任务队列的大小 则wait生产者线程 不再生产 直到有消费者线程执行了任务队列的任务 此时唤醒生产者线程继续生产。
take方法的主要作用是如果此时线程池中的消费者线程执行完了自己的任务 然后就会通过take方法从任务队列中获取任务 并且从任务队列中移除 如果此时获取不到任务 说明任务队列为空 然后wait当前这个消费者线程 直到任务队列重新有任务 唤醒消费者线程继续消费任务。
put方法与offer方法的对比:put方法是如果任务队列已经满了 我们就生产线程还会生产一个任务 只不过这个任务陷入了等待 直到任务队列不满时才会重新执行。offer方法是有时限的添加任务 如果此时任务队列满了 我们生产线程还会生产一个任务 只不过这个任务会进入有时限的等待 如果超过这个时间我们就放弃这个任务 转而执行下一个任务。
take方法与pull方法:take方法是如果任务队列为空 则此消费线程此时获取不到任务 然后陷入等待 直到任务队列不为空 然后获取到这个任务并把这个任务从任务队列中移除,可以理解为这个线程一直在线程池内等待新的任务的到来。pull方法是如果任务队列为空 此时消费线程此时获取不到任务 然后会进入有时限的等待 如果超过这个时限还没有获取到任务 则把这个线程从线程池中移除。

ThreadPool

这个类是线程池类,需要关注execute方法与有参构造
有参构造:参数是线程池需要的配置,有coreSize 线程核心数(线程池大小)、queueCapcity 任务队列大小 用来创建任务队列时传参、timeout take方法的超时时间、timeUnit 统一时间配置、rejectPolicy拒绝策略
execute方法是执行任务的方法,线程数没有超过核心数时:我们 创建线程 把线程加入线程集合 并启动线程执行run方法 run方法通过take/pull方法从任务队列里获取任务 如果获取到任务就执行任务 如果没有获取到任务就等待。线程数超过核心数时:说明我们的任务数量已经大于线程数了 此时又因为我们的线程池已经满了 所以转而由生产线程执行拒绝策略。

RejectPolicy

拒绝策略函数式接口,需要关注它的实现,它的作用是当任务数大于核心数时 此时消费线程不能再执行多出的任务,此时生产线程执行拒绝策略 具体讨论这部分的任务应该怎么处理。
实现:拒绝策略是在主线程创建线程池对象时写的,在代码中我们也能看到 它分为五种处理策略,==死等==:可以看出它执行的是put方法 也就是多出的任务放入任务队列 直到任务队列满 不过如果此时仍然有多出任务 就会一直等待任务队列到不满时 然后等待的任务加入任务队列。==带超时等待==:也就是执行offer方法 和put方法基本一样 只不过在任务队列满了后 进入有时限的等待 如果超过时间则放弃这个任务 执行下一个任务。==让调用者放弃任务执行==:也就是如果任务数超过核心数 且任务队列已满,超出部分的任务生产者直接放弃。==让调用者抛出异常==:如果任务数超过核心数 且任务队列已满,生产者直接抛出异常。==让调用者自己执行任务==:如果任务数超过核心数 且任务队列已满,生产者自己把这个任务执行了。

03.不同情况下线程池的执行情况

我们以下结果的拒绝策略都为死等,消费线程获取任务队列用的是pull方法

核心数+任务队列=任务的数量

我们把核心数设置为2 任务队列设置为2 任务数量设置为4

结果

解释
上来先新增两个任务0 1,然后直接被Thread0 Thread1执行了,多出的两个任务2 3我们放入了任务队列 也就是图片里的加入任务队列,1s后 线程池的两个线程执行任务完毕 从线程池中获取新的任务,所以此时2 3任务被从任务队列中取出 然后Thread0 Thread1执行 2 3任务,最后 因为我们用的是pull方法有时限获取在1s后没有获取到任务 所以把Thread0 Thread1移除线程池

核心数+任务队列<任务的数量

我们把核心数设置为2 任务队列设置为2 任务数量设置为6
结果

任务数>线程数+任务队列,又因为我们每一个任务都要执行1s,所以势必会有任务等待加入任务队列,我们看上图,可以发现0 1任务被执行 2 3任务进入任务队列 4任务等待加入任务队列,因为此时生产者线程陷入等待 所以任务5就没有被生产出来,当0 1任务执行完后,Thread0 线程执行任务2 任务4加入任务队列 此时任务队列又满了 所以任务5此时等待加入任务队列,然后Thread1执行任务3 然后任务5加入任务队列,最后任务5 任务4被执行,没有任务后 两个消费线程被移除线程池。

目录
相关文章
|
4月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
200 0
|
1月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
147 2
|
4月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
5月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
369 5
|
9月前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
445 60
【Java并发】【线程池】带你从0-1入门线程池
|
7月前
|
Java
线程池是什么?线程池在实际工作中的应用
总的来说,线程池是一种有效的多线程处理方式,它可以提高系统的性能和稳定性。在实际工作中,我们需要根据任务的特性和系统的硬件能力来合理设置线程池的大小,以达到最佳的效果。
229 18
|
1月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
146 6
|
4月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
302 83
|
1月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
230 0
|
2月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
220 16

热门文章

最新文章