Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列

简介: Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列

前言


在多线程编程中,线程池是一种非常重要的工具。它可以帮助我们高效地管理线程资源,避免频繁创建和销毁线程带来的性能开销。Java中提供了强大的线程池实现,如 ThreadPoolExecutor ,但有时我们可能希望了解其内部原理,并实现一个简单的线程池来加深对其工作机制的理解,手写线程池也是很多大厂常考的笔试题。


开个新坑-手搓简易线程池。本系列文章将从零开始,一步步手工编码实现一个简单但功能完备的线程池。我们将逐步介绍线程池的各个核心组件,并分析它们的作用和实现思路。通过这个过程,我们可以更好地掌握多线程编程的技巧,并培养编码能力和系统性思维。


本篇文章的任务是带领大家定义任务等待队列,任务等待队列是线程池中一个十分重要的组成部分,在各种生产者-消费者模型场景下十分常见,如下我们先来介绍一下等待队列。

等待队列是什么


等待队列(Task Queue)是一种用于临时存储任务的队列数据结构。在多线程环境下,它常被用作生产者-消费者模型中的"缓冲区",用于平衡任务的产生速度和任务的处理速度之间的差异。


等待队列本质上是一个先进先出(FIFO)的队列,新加入的任务会被存储在队列尾部,而消费者则从队列头部获取任务执行。根据队列的实现方式不同,它可以是无界的或者有界的。无界队列理论上可以存储无限多的任务,而有界队列则有最大容量限制。


我们这里的等待队列底层可以借助JDK提供的双端队列ArrayDeque实现。

为什么需要等待队列

在现实场景中,任务的产生速度往往是不均匀的,而执行任务的线程资源又是有限的。如果没有等待队列,当任务瞬间扩增时,可能会出现以下问题:


任务无法被及时消费,导致任务丢失或被阻塞。

需要临时创建大量线程来处理任务,线程的创建和销毁开销巨大,影响系统性能。

任务的执行顺序无法得到保证,可能导致某些重要任务长期得不到执行。

引入等待队列后,生产者可以先将任务存入队列,而消费者则持续从队列中获取并执行任务。这种"缓冲"机制可以有效应对任务瞬间扩增的情况,并保证任务按先后顺序被逐个消费。


此外,通过设置有界队列,我们还可以限制队列的最大容量,防止任务无限堆积导致内存溢出。当队列已满时,我们可以采取拒绝策略(如直接丢弃、暂存等)来应对新加入的任务。如下图,任务等待队列其实就是联系任务生成者与任务消费者的一个桥梁,生产者生产消息放入等待队列中,再由消费者拿取消费。

实现思路

为了实现一个功能完备的任务等待队列,我们需要设计一个阻塞队列BlockQueue,它具有以下几个关键特性:


有界队列 BlockQueue将设置一个固定的容量size,队列中最多只能存储size个任务。这样可以防止任务无限制地堆积,导致内存溢出。当队列满时,新添加的任务将被阻塞,直到队列中有空位。


线程安全 BlockQueue的操作,包括添加任务put和获取任务take,都需要保证线程安全。我们将使用Java的重入锁ReentrantLock和条件变量Condition来实现线程的正确同步。


支持超时 在添加或获取任务时,BlockQueue将提供带超时时间的方法offer和poll。这样可以防止任务被无限期阻塞,提高系统的响应性和健壮性。


基于以上思路,BlockQueue的实现将涉及以下几个核心部分:


使用双端队列Deque作为底层数据结构存储任务

使用ReentrantLock和两个Condition(emptyCondition和fullCondition)来实现阻塞和唤醒机制

实现put、offer、take、poll等核心方法,正确控制任务的添加和获取

代码实现

1.新建BlockQueue类

我们这里新建一个名为BlockQueue的类,并声明一些属性与字段,其中ArrayDeque为一个双端队列,可以在队列的两端操作元素,size为定义的队列大小

public class BlockQueue<T> {
    // 双端队列
    private Deque<T> deque = new ArrayDeque<>();
    // 队列的容量
    private int size;
 
    public BlockQueue(int size) {
        this.size = size;
    }
}


2.任务的添加和获取方法

接下来我们编写向队列中添加任务与获取任务的方法,但是这里要注意的是,读写操作都是在线程池多线程的环境下进行的,存在线程安全问题,所以我们需要配合锁机制来保证操作的原子性。我们这里用JDK提供的ReentrantLock可重入锁来解决这个问题。


继续新增ReentrantLock字段:

private ReentrantLock lock = new ReentrantLock();

此外,我们还需要解决任务添加与获取时的等待操作,也就是当队列为空时,消费者需要等待任务产生,当队列满时,生产者需要等待队列中有空位才能存入任务。如下图,当消费者获取到锁时,会尝试获取任务,但发现队列为空,就会阻塞等待。



当生产者生产任务后,肯定不能让消费者干等着,而是去通知消费者有活做啦~如下图:

我们可以看到这里涉及到了生产者与消费者不同线程的通讯,这里我们可以借助Condition来完成消费者线程与生产者线程之间的通讯。


分别定义两个Condition代表队列满情况的等待室与队列空情况的等待室

    // 队列空情况的休息室
    Condition emptyCondition = lock.newCondition();
 
    // 队列满情况的休息室
    Condition fullCondition = lock.newCondition();


定义阻塞添加任务方法put
// 添加任务 阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (size == deque.size()) {
                try {
                    fullCondition.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            log.debug("task 添加成功 ,{}", task);
            deque.addLast(task);
            emptyCondition.signal();
        } finally {
            lock.unlock();
        }
    }


这里添加任务会存在两张情况:

  • 队列满了:调用fullCondition.await()方法挂起当前生产者线程,也就是让当前生产者线程等待。
  • 队列没满:将任务加入队列中,并调用emptyCondition.signal()通知挂起的消费者。


编写堵塞拿取任务方法take
    // 获取任务
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyCondition.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = deque.removeFirst();
            fullCondition.signal();
            log.debug("获取了任务 {}", t);
            return t;
        } finally {
            lock.unlock();
        }
    }

同理,这里拿取任务也会存在两张情况:

  • 队列空的:调用emptyCondition.await()方法挂起当前消费者线程,也就是让当前消费者线程等待。
  • 队列存在元素:任务出栈,并返回出栈的任务元素,然后调用fullCondition.signal()通知挂起的生产者。


以上我们其实我们就完成了一个简单的任务堵塞队列,但是我们会发现,如果这两个方法都是会一直堵塞,显然是不合理的,所以我们这里新增添加和获取任务的超时方法。

带超时时间的阻塞添加方法offer
// 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (deque.size() == size) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            deque.addLast(task);
            emptyCondition.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

我们可以利用fullCondition.awaitNanos来实现超时等待,当超过给定参数时间时,就会被自动唤醒,并且将传入时间-等待时间作为返回值,下一次循环nanos <= 0时,就会判定为本次操作超时失败了

带超时时间的阻塞获取方法poll
// 带超时时间阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        // 1.上锁
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout); // 转为毫秒
            // 2.首先检查队列是否存在元素
            while(deque.isEmpty()){
                try {
                    // 2.1超时判断,返回值是剩余时间
                    if(nanos <= 0){
                        return null;
                    }
                    // 2.2超时等待
                    log.debug("等待获取任务");
                    nanos = emptyCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 3.拿取元素
            T task = deque.removeFirst();
            log.info("任务拿取成功:{}", task);
            // 4.唤醒挂起的生产者
            fullCondition.signal();
            return task;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

总结

我们在本节内容中实现线程池中一个重要的组件 - 任务等待队列BlockQueue,编写的对应的代码也已同步到了github仓库中(本章节可参考liushijie-240328-factory分支),估计还要两次文章的内容才能完成手写线程池部分的内容,后面主要要完成的就是线程池内部运行的基本逻辑,自定义拒绝策略等内容了,博主也是边学边实践边输出教学文章,如果有什么问题都可以在评论区留言,觉得写得不错的话就多多支持吧~

相关文章
|
4月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
181 0
|
4月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
291 0
|
6月前
|
存储 JSON Java
《从头开始学java,一天一个知识点》之:方法定义与参数传递机制
**你是否也经历过这些崩溃瞬间?** - 看了三天教程,连`i++`和`++i`的区别都说不清 - 面试时被追问&quot;`a==b`和`equals()`的区别&quot;,大脑突然空白 - 写出的代码总是莫名报NPE,却不知道问题出在哪个运算符 🚀 这个系列就是为你打造的Java「速效救心丸」!我们承诺:每天1分钟,地铁通勤、午休间隙即可完成学习;直击痛点,只讲高频考点和实际开发中的「坑位」;拒绝臃肿,没有冗长概念堆砌,每篇都有可运行的代码标本。上篇:《输入与输出:Scanner与System类》 | 下篇剧透:《方法重载与可变参数》。
127 25
|
6月前
|
存储 监控 Java
《从头开始学java,一天一个知识点》之:数组入门:一维数组的定义与遍历
**你是否也经历过这些崩溃瞬间?** - 看了三天教程,连`i++`和`++i`的区别都说不清 - 面试时被追问&quot;`a==b`和`equals()`的区别&quot;,大脑突然空白 - 写出的代码总是莫名报NPE,却不知道问题出在哪个运算符 这个系列就是为你打造的Java「速效救心丸」!我们承诺:每天1分钟,地铁通勤、午休间隙即可完成学习;直击痛点,只讲高频考点和实际开发中的「坑位」;拒绝臃肿,没有冗长概念堆砌,每篇都有可运行的代码标本。明日预告:《多维数组与常见操作》。 通过实例讲解数组的核心认知、趣味场景应用、企业级开发规范及优化技巧,帮助你快速掌握Java数组的精髓。
117 23
|
5月前
|
存储 Java 数据挖掘
Java 中数组的多种定义方式
本文深入解析了Java中数组的多种定义方式,涵盖基础的`new`关键字创建、直接初始化、动态初始化,到多维数组、`Arrays.fill()`方法以及集合类转换为数组等高级用法。通过理论与实践结合的方式,探讨了每种定义方法的适用场景、优缺点及其背后的原理,帮助开发者掌握高效、灵活的数组操作技巧,从而编写更优质的Java代码。
180 0
|
6月前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
274 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
6月前
|
数据采集 存储 安全
Python爬虫实战:利用短效代理IP爬取京东母婴纸尿裤数据,多线程池并行处理方案详解
本文分享了一套结合青果网络短效代理IP和多线程池技术的电商数据爬取方案,针对京东母婴纸尿裤类目商品信息进行高效采集。通过动态代理IP规避访问限制,利用多线程提升抓取效率,同时确保数据采集的安全性和合法性。方案详细介绍了爬虫开发步骤、网页结构分析及代码实现,适用于大规模电商数据采集场景。
|
2月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
117 0
|
2月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
3月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
216 5

热门文章

最新文章