【Java|多线程与高并发】阻塞队列以及生产者-消费者模型

简介: 阻塞队列(BlockingQueue)常用于多线程编程中,可以实现线程之间的同步和协作。它可以用来解决生产者-消费者问题,其中生产者线程将元素插入队列,消费者线程从队列中获取元素,它们之间通过阻塞队列进行协调。

1. 前言

阻塞队列(BlockingQueue)常用于多线程编程中,可以实现线程之间的同步和协作。它可以用来解决生产者-消费者问题,其中生产者线程将元素插入队列,消费者线程从队列中获取元素,它们之间通过阻塞队列进行协调。

01d3d277a6674aaab309230f4022c4ac.gif


2. 阻塞队列

Java中的阻塞队列(BlockingQueue)是一种特殊的队列,它在队列为空时会阻塞获取元素的操作,直到队列中有新的元素被添加进来;在队列已满时会阻塞插入元素的操作,直到队列中有空的位置。


需要注意的是 在Java中 BlockingQueue是一个接口

a9d280464fb5411fbae22050c5068043.png


Java中提供了多种阻塞队列的实现,包括:


1.ArrayBlockingQueue:基于数组实现的有界阻塞队列,它按照先进先出的顺序对元素进行排序。

2.LinkedBlockingQueue:基于链表实现的可选有界阻塞队列,它可以指定容量,如果不指定则默认为无界队列。

3.PriorityBlockingQueue:基于优先级堆实现的无界阻塞队列,它可以按照元素的优先级进行排序。

4.SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作。

5.LinkedBlockingDeque: 基于链表的双端阻塞队列。

6.LinkedTransferQueue: 基于链表、无界的的阻塞队列。

作为一个队列,有三个基本操作,入队,出队和查看队首元素.


在使用阻塞队列时,有两点需要注意:


offer和put可以实现入队列,offer并没有阻塞功能,put具有阻塞功能

poll和take可以实现出队列,poll没有阻塞功能,take具有阻塞功能

示例:


public class Demo14 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue  = new LinkedBlockingQueue<>(10);
        blockingQueue.put(1);
        blockingQueue.put(2);
        int ret = blockingQueue.take();
        System.out.println(ret);
        ret = blockingQueue.take();
        System.out.println(ret);
        blockingQueue.take();
        System.out.println(ret);
    }
}

代码分析:


72545de55ffe49d29737c6fdbf22edcb.png

3. 实现阻塞队列

阻塞队列的实现并不复杂,主要是通过这个过程,强化对于阻塞队列的认识.


对于实现阻塞队列可以大致分为3步:


实现一个普通的队列

保证线程安全

加上阻塞功能

实现一个普通队列的方法可以使用链表,也能够使用数组.


接下来就基于数组来实现一个 循环队列


代码:


public class MyBlockingQueue {
        private int[] elem;
        private int usedSize = 0;// 有效个数
        private int front = 0;
        private int rear = 0;
        public MyBlockingQueue(){
            this.elem = new int[10];
        }
        public MyBlockingQueue(int k){
            this.elem = new int[k];
        }
        /**
         * 入队
         * @param val
         * @return
         */
        public void put(int val){
            // 判断队列是否满了
            if (usedSize >= elem.length){
                return;
            }
            // 进行入队列操作
            elem[rear] = val;
            rear++;
            if (rear >= elem.length){
                rear = 0;
            }
            usedSize++;
        }
        /**
         * 出队
         * @return
         */
        public Integer take(){
            if (usedSize == 0){
                return null;
            }
            int ret = elem[front];
            front++;
            if (front >= elem.length){
                front = 0;
            }
            usedSize--;
            return ret;
        }
}

循环队列在判断队列是否满时有两种方式:


1.用计数器记录有效数据的个数(上述代码使用的是这种方法)


2.浪费一个空间不用. 当(rear+1) % 数组长度== front时为队列满这种情况


以上就是一个简单的循环队列的实现,详细可以参考我的这篇文章【数据结构与算法】队列-模拟实现队列以及设计循环队列


解决了第一步,接下来就来实现第二步,解决线程安全问题


解决这个问题,就离不开synchronized了,以防万一再给变量加上volatile关键字


public class MyBlockingQueue {
        private int[] elem;
        private volatile int usedSize = 0;// 有效个数
        private volatile int front = 0;
        private volatile int rear = 0;
        public MyBlockingQueue(){
            this.elem = new int[10];
        }
        public MyBlockingQueue(int k){
            this.elem = new int[k];
        }
        /**
         * 入队
         * @param val
         * @return
         */
        public void put(int val){
            synchronized (this) {
                // 判断队列是否满了
                if (usedSize >= elem.length){
                    return;
                }
                // 进行入队列操作
                elem[rear] = val;
                rear++;
                if (rear >= elem.length){
                    rear = 0;
                }
                usedSize++;
            }
        }
        /**
         * 出队
         * @return
         */
        public Integer take(){
            synchronized (this) {
                if (usedSize == 0){
                    return null;
                }
                int ret = elem[front];
                front++;
                if (front >= elem.length){
                    front = 0;
                }
                usedSize--;
                return ret;
            }
        }
}

解决完线程安全问题,接下来就是给put和take添加阻塞功能.


以下两种场景会有阻塞功能:


如果队列为空,出队列需要阻塞


如果队列为满,入队列需要阻塞


阻塞功能很好加,直接使用wait方法即可


但除了阻塞,还要通知唤醒线程,. 例如线程为空,出队列在阻塞状态,而入队列之后,队列不为空,就要让出队列继续执行才行. 因此还要搭配notify来实现


阻塞队列实现代码如下:


public class MyBlockingQueue {
        private int[] elem;
        private volatile int usedSize = 0;// 有效个数
        private volatile int front = 0;
        private volatile int rear = 0;
        public MyBlockingQueue(){
            this.elem = new int[10];
        }
        public MyBlockingQueue(int k){
            this.elem = new int[k];
        }
        /**
         * 入队
         * @param val
         * @return
         */
        public void put(int val) throws InterruptedException {
            synchronized (this) {
                // 判断队列是否满了
                while (usedSize >= elem.length){
                    this.wait();
                }
                // 进行入队列操作
                elem[rear] = val;
                rear++;
                if (rear >= elem.length){
                    rear = 0;
                }
                usedSize++;
                this.notify();
            }
        }
        /**
         * 出队
         * @return
         */
        public Integer take() throws InterruptedException {
            synchronized (this) {
                while (usedSize == 0){
                    this.wait();
                }
                int ret = elem[front];
                front++;
                if (front >= elem.length){
                    front = 0;
                }
                usedSize--;
                this.notify();
                return ret;
            }
        }
}

代码分析:



6571b5f6b5b044edad418f6e9d9b232b.png



此时出队列和入队列是相互唤醒的状态, 两个wait的执行条件互斥,因此不会出现同时阻塞的状态.


同时将wait的执行条件改为while,是因为如果线程是被interrupt唤醒的话,队列仍然为空,就不能去执行后续代码,因此要再进行条件判断,因此改为while更加稳妥


验证:


5142af5f546c4507af0fa87fa1abfb16.png


使用阻塞队列的好处:


1.使用阻塞队列,有利于代码"解耦合"

2.削峰填谷,利用生产者消费者模型在并发量高的时候将这些并发量分配到每一个服务器上.

4. 生产者-消费者模型

接下来来介绍生产者-消费者模型


生产者消费者模型是一种常见的并发编程模型,用于解决生产者和消费者之间的协作问题。在该模型中,生产者负责生产数据,消费者负责消费数据,它们通过共享的缓冲区来进行通信。


生产者消费者模型的主要特点:


1.生产者和消费者是两个独立的实体,它们可以运行在不同的线程中。

2.生产者负责生成数据,并将数据放入缓冲区中。

3.消费者负责从缓冲区中取出数据,并进行相应的处理。

4.缓冲区是生产者和消费者之间的共享数据结构,用于存储生产者生产的数据。

5.生产者和消费者之间的操作是互斥的,即同一时间只能有一个生产者或一个消费者对缓冲区进行操作。

为了实现生产者消费者模型,可以使用阻塞队列来作为缓冲区。为了实现生产者消费者模型,可以使用阻塞队列来作为缓冲区。


生产者消费者模型的基本流程:


1.创建一个共享的阻塞队列,作为生产者和消费者之间的缓冲区。

2.创建生产者线程,它生成数据并将其放入队列中。

3.创建消费者线程,它从队列中获取数据并进行处理。

4.启动生产者和消费者线程,它们会并发执行。

5.生产者线程生成数据并插入队列,当队列已满时会被阻塞。

6.消费者线程从队列中获取数据并进行处理,当队列为空时会被阻塞。

7.生产者和消费者线程可以通过阻塞队列进行同步和协作,生产者在队列已满时会等待,直到有空闲位置可以插入数据;消费者在队列为空时会等待,直到有数据可以获取。

代码示例:

public class Demo16 {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        // 生产者
        Thread t1 = new Thread(() ->{
            for (int i = 0; i < 1000; i++) {
                try {
                    blockingQueue.put(i);
                    System.out.println("生产元素: "+ i);
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        // 消费者
        Thread t2 = new Thread(() ->{
            while(true){
                try {
                    Integer ret = blockingQueue.take();
                    System.out.println("消费元素: "+ ret);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t2.start();
    }
}

运行截图:


8dfa121585144f79b7adb77a25adb787.png


上述代码仅仅只是一个示例,并没有什么实际意义.


生产者消费者模型依旧很重要,它可以有效地实现线程间的协作和资源共享,提高系统的并发性和吞吐量。


5. 总结

阻塞队列的使用可以简化多线程编程的复杂性,避免手动实现线程间的同步和协作逻辑,提高代码的可读性和可维护性。基于阻塞队列的生产者-消费者模型也要重点掌握.。阻塞队列作为生产者和消费者之间的缓冲区,提供线程安全的插入和获取操作,并在队列为空或队列已满时进行阻塞,从而实现线程间的同步。

fb260d68d6a4443b928563ce9320c99a.gif

相关文章
|
3月前
|
Java 大数据 Go
从混沌到秩序:Java共享内存模型如何通过显式约束驯服并发?
并发编程旨在混乱中建立秩序。本文对比Java共享内存模型与Golang消息传递模型,剖析显式同步与隐式因果的哲学差异,揭示happens-before等机制如何保障内存可见性与数据一致性,展现两大范式的深层分野。(238字)
114 4
|
3月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
236 1
|
3月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
250 1
|
4月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
Java 数据库 Spring
209 0
|
4月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
379 16
|
5月前
|
缓存 并行计算 安全
关于Java多线程详解
本文深入讲解Java多线程编程,涵盖基础概念、线程创建与管理、同步机制、并发工具类、线程池、线程安全集合、实战案例及常见问题解决方案,助你掌握高性能并发编程技巧,应对多线程开发中的挑战。
|
5月前
|
缓存 前端开发 Java
Java类加载机制与双亲委派模型
本文深入解析Java类加载机制,涵盖类加载过程、类加载器、双亲委派模型、自定义类加载器及实战应用,帮助开发者理解JVM核心原理与实际运用。
|
5月前
|
机器学习/深度学习 人工智能 自然语言处理
Java 大视界 -- Java 大数据机器学习模型在自然语言生成中的可控性研究与应用(229)
本文深入探讨Java大数据与机器学习在自然语言生成(NLG)中的可控性研究,分析当前生成模型面临的“失控”挑战,如数据噪声、标注偏差及黑盒模型信任问题,提出Java技术在数据清洗、异构框架融合与生态工具链中的关键作用。通过条件注入、强化学习与模型融合等策略,实现文本生成的精准控制,并结合网易新闻与蚂蚁集团的实战案例,展示Java在提升生成效率与合规性方面的卓越能力,为金融、法律等强监管领域提供技术参考。
|
5月前
|
机器学习/深度学习 算法 Java
Java 大视界 -- Java 大数据机器学习模型在生物信息学基因功能预测中的优化与应用(223)
本文探讨了Java大数据与机器学习模型在生物信息学中基因功能预测的优化与应用。通过高效的数据处理能力和智能算法,提升基因功能预测的准确性与效率,助力医学与农业发展。

热门文章

最新文章