Java阻塞队列

简介: Java阻塞队列



一、什么是阻塞队列

阻塞队列(Blocking Queue)是一种特殊的队列,因其为队列,因此遵循“先进先出”的原则,此外,其是一种线程安全的数据结构,具有以下两点特性:

1. 当队列满时,继续入队列就会阻塞,直到有线程将元素从队列中取出

2. 当队列空时,继续出队列就会阻塞,直到有线程向队列中插入元素

阻塞队列常用于“生产者消费者模型

什么是“生产者消费者模型”?

“生产者消费者模式”是一种典型的开发模型,通过一个容器来解决生产者和消费者的强耦合问题

生产者:向队列中添加元素的线程

消费者:向队列中取出元素的线程

生产者和消费者之间不直接通讯,而是通过阻塞队列来通讯,生产者产出数据后,不用等待消费者处理数据,而是将数据放入阻塞队列中;消费者不用等待消费者产出数据,而是直接从阻塞队列中取出元素。

生产者和消费者之间直接通讯:

通过阻塞队列通讯:

为什么要在生产者和消费者之间使用阻塞队列?

1. 阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力

当生产者突然产出大量数据,若直接将这些数据交给消费者处理,消费者可能一时处理不了这么多数据,而若生产者将这些数据放入阻塞队列中,消费者则可慢慢从阻塞队列中取出数据来处理

2. 阻塞队列能够使生产者和消费者之间解耦合

若生产者直接将数据交给消费者处理,则生产者要考虑消费者处理数据的能力,一旦消费者未能及时处理数据,生产者则要等待消费者处理完数据;同样,一旦生产者未能及时产出数据,消费者就得等待生产者产出数据。

而若加上阻塞队列,生产者只负责产出数据,并将其放入阻塞队列中,消费者只负责从阻塞队列中取出数据并处理。此时就相当于“流水线”,每个线程只专注完成自己的任务。

二、阻塞队列的使用

Java标准库中内置了阻塞队列,当我们需要使用阻塞队列时,可直接使用标准库中提供的阻塞队列。

BlockingQueue是一个接口,其实现类是ArrayBlockingQueue,LinkedBlockingQueue等

Java中提供的阻塞队列有:

ArrayBlockingQueue:基于数组实现的有界阻塞队列

LinkedBlockingQueue:基于链表实现的有界阻塞队列

PriorityBlockingQueue:支持优先级的无界阻塞队列

DelayQueue:基于PriorityBlockingQueue实现的无界延迟队列

SynchronousQueue:不存储元素的阻塞队列

LinkedTransferQueue:基于链表实现的无界阻塞队列

LinkedBlockingDeque:基于链表实现的双向阻塞队列

阻塞队列类中的常用方法

void put(E e) throws InterruptedException

put 方法用于插入元素,当队列未满时,将元素插入队列尾,若队列已满,则无法继续插入元素,阻塞,直到队列中有空闲空间,才解除阻塞状态,并将元素插入到队列中

E take() throws InterruptedException

take 方法用于取出队列中队首元素, 并返回该元素,若队列中有元素,则正常取出数据,若队列为空,则阻塞,直到队列中有元素,解除阻塞状态,并取出该元素

boolean add(E e)

add 方法用于向队列中添加元素,若添加成功,则返回true,若队列已满,添加失败,add 方法会抛出异常

E remove()

remove 方法用于删除元素,并返回,当队列为空时,remove 方法会抛出异常

E element()

element 方法用于查看队首元素,并返回,但是不将其从队列中删除,若队列为空,则抛出异常

boolean offer(E e)

offer 方法用于添加元素,若添加成功,则返回true,若队列已满,添加失败时,返回false

E poll()

poll 方法用于移除并返回队首元素,若队列为空,则返回null,因此,不允许向队列中插入null,否则我们无法区分返回的null是队列中的元素还是队列为空时的提示

E peek()

peek 方法用于查看队首元素,并返回,但是不将其从队列中删除,当队列为空时,返回null

总结:

方法 抛出异常 通过返回值判断 阻塞
插入元素 add(e) offer(e) put(e)
获取并移除队首元素 remove() poll() take()
获取但不移除队首元素 element() peek()

三、模拟实现阻塞队列

了解了什么是阻塞队列和阻塞队列的常用方法,那么阻塞队列是如何实现的呢?

我们通过模拟实现阻塞队列中带有阻塞功能的 put 和 take 方法来进一步了解阻塞队列

在这里我们想通过模拟实现 put 和 take 来了解阻塞队列,因此为了简单,我们将存储的元素定义为Integer,而不写作泛型的形式

如何实现 put 和 take 方法?

首先我们分析 put 和 take 方法分别要实现哪些功能

put :

1. 实现入队列操作

2. 保证线程安全

3. 当队列满时,阻塞

take :

1. 实现出队列操作

2. 保证线程安全

3. 当队列为空时,出队列阻塞

1. 实现入队列和出队列操作:

要想实现队列满后,若有元素出队列,则可继续插入元素,需要使用循环队列,这里选择使用数组来实现循环队列

实现循环队列:

1. 定义front 指向队首元素,rear 指向队尾元素,usedSize 标记队列中已有元素

2. 若队列中有空闲空间,可以进行入队列

3. 若队列中有元素,可以进行出队列

public class MyArrayBlockingQueue {
    private int[] elems = null;
    private int front = 0;
    private int tail = 0;
    private int usedSize = 0;
    //初始化循环队列容量
    public MyArrayBlockingQueue(int capacity){
        if(capacity <= 0){
            throw new IllegalArgumentException("队列容量应大于0!");
        }
        elems = new int[capacity];
    }
    //实现入队列
    public void put(int elem){
        //若队列满,则不能向队列中添加元素
        if(usedSize >= elems.length){
            return;
        }
        //向队列中添加元素
        elems[tail++] = elem;
        //若tail指向队列最后一个位置之后,则将其放到0位置处
        if(tail >= elems.length){
            tail = 0;
        }
        usedSize++;
    }
    //实现出队列
    public int take(){
        //若队列为空,则不能入队列
        if(usedSize == 0){
            //此时我们暂时先用抛出异常的方式来标记不能入队列
            throw new RuntimeException();
        }
        //取出front位置元素
        int val = elems[front++];
        //若front指向队列最后一个位置之后,则将其放到0位置处
        if(front >= elems.length){
            front = 0;
        }
        usedSize--;
        return val;
    }
}

2. 保证线程安全

上面实现的循环队列操作是不安全的

例如:

由于 put 和 take 方法都有写操作,因此都需要加锁

这里我们通过锁对象来进行加锁操作(也可以通过 this 进行加锁操作)

在哪里加锁?

若我们仅对添加元素和删除元素加锁,仍不能上图中存在的问题,因此我们要对 判断和入队列(或出队列)操作 加锁

public class MyArrayBlockingQueue {
    private int[] elems = null;
    private int front = 0;
    private int tail = 0;
    private int usedSize = 0;
    private Object locker = new Object();//锁对象
    //初始化循环队列容量
    public MyArrayBlockingQueue(int capacity){
        if(capacity <= 0){
            throw new IllegalArgumentException("队列容量应大于0!");
        }
        elems = new int[capacity];
    }
    //实现入队列
    public void put(int elem){
        //加锁
        synchronized (locker){
            //若队列满,则不能向队列中添加元素
            if(usedSize >= elems.length){
                return;
            }
            //向队列中添加元素
            elems[tail++] = elem;
            //若tail指向队列最后一个位置之后,则将其放到0位置处
            if(tail >= elems.length){
                tail = 0;
            }
            usedSize++;
        }
    }
    //实现出队列
    public int take(){
        int val = 0;
        //加锁
        synchronized (locker){
            //若队列为空,则不能入队列
            if(usedSize == 0){
                //此时我们暂时先用抛出异常的方式来标记不能入队列
                throw new RuntimeException();
            }
            //取出front位置元素
            val = elems[front++];
            //若front指向队列最后一个位置之后,则将其放到0位置处
            if(front >= elems.length){
                front = 0;
            }
            usedSize--;
        }
        return val;
    }
}

3. 当队列为空时,出队列阻塞;当队列满时,入队列阻塞

在队列满的情况下,当有元素出队列时,解除入队列阻塞状态;

在队列为空的情况下,当有元素入队列,解除出队列阻塞状态;

//实现入队列
    public void put(int elem) throws InterruptedException {
        //加锁
        synchronized (locker){
            //若队列满,则不能向队列中添加元素,阻塞
            if(usedSize >= elems.length){
                locker.wait();
            }
            //向队列中添加元素
            elems[tail++] = elem;
            //若tail指向队列最后一个位置之后,则将其放到0位置处
            if(tail >= elems.length){
                tail = 0;
            }
            usedSize++;
            // 入队列成功,此时唤醒一个阻塞的出队列线程
            locker.notify();
        }
    }
    //实现出队列
    public int take() throws InterruptedException {
        int val = 0;
        //加锁
        synchronized (locker){
            //若队列为空,则不能入队列,阻塞
            if(usedSize == 0){
                locker.wait();
            }
            //取出front位置元素
            val = elems[front++];
            //若front指向队列最后一个位置之后,则将其放到0位置处
            if(front >= elems.length){
                front = 0;
            }
            usedSize--;
            //出队列成功,此时唤醒一个入队列阻塞的线程
            locker.notify();
        }
        return val;
    }

然而,此时代码还存在一些问题

我们本想通过 put 方法中的 notify 唤醒在队列空时 出队列阻塞的线程,但在上图中,我们却通过notify 唤醒了 队列满时 入队列阻塞的线程,从而导致在队列满的情况下,仍向其中插入元素

如何解决上述问题?

我们可以在判断队列为空时,将 if 改为 while ,每唤醒一次线程,再判断一次

即在wait之前进行一次判断,唤醒之后再进行一次判断。

因而在上图情况中,虽然t3线程被唤醒,但由于又进行了一次判定,此时判断队列仍满,因而再次阻塞,而不会向下执行入队列操作

因此,模拟实现的阻塞队列代码为:

public class MyArrayBlockingQueue {
    private int[] elems = null;
    private int front = 0;
    private int tail = 0;
    private int usedSize = 0;
    private Object locker = new Object();//锁对象
    //初始化循环队列容量
    public MyArrayBlockingQueue(int capacity){
        if(capacity <= 0){
            throw new IllegalArgumentException("队列容量应大于0!");
        }
        elems = new int[capacity];
    }
    //实现入队列
    public void put(int elem) throws InterruptedException {
        //加锁
        synchronized (locker){
            //若队列满,则不能向队列中添加元素,阻塞
            while (usedSize >= elems.length){
                locker.wait();
            }
            //向队列中添加元素
            elems[tail++] = elem;
            //若tail指向队列最后一个位置之后,则将其放到0位置处
            if(tail >= elems.length){
                tail = 0;
            }
            usedSize++;
            // 入队列成功,此时唤醒一个阻塞的出队列线程
            locker.notify();
        }
    }
    //实现出队列
    public int take() throws InterruptedException {
        int val = 0;
        //加锁
        synchronized (locker){
            //若队列为空,则不能入队列,阻塞
            while (usedSize == 0){
                locker.wait();
            }
            //取出front位置元素
            val = elems[front++];
            //若front指向队列最后一个位置之后,则将其放到0位置处
            if(front >= elems.length){
                front = 0;
            }
            usedSize--;
            //出队列成功,此时唤醒一个入队列阻塞的线程
            locker.notify();
        }
        return val;
    }
}

此时,我们再通过一个简单的生产者消费者模型来验证一下我们实现的阻塞队列是否有问题

public class Test {
    public static void main(String[] args) {
      MyArrayBlockingQueue queue = new MyArrayBlockingQueue(10);
      //生产者
        Thread producer = new Thread(() -> {
            int n = 1;
            while (true){
                try {
                    queue.put(n);
                    System.out.println("生产元素:" + n);
                    n++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        //消费者
        Thread customer = new Thread(() -> {
            while (true){
                try {
                    int n = queue.take();
                    System.out.println("消费元素:" + n);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
        customer.start();
    }
}

运行结果:

目录
相关文章
|
3月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue
`ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要: - **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。 - **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。 - **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。
46 0
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
2月前
|
存储 Java API
java线程之阻塞队列
java线程之阻塞队列
20 0
|
2月前
|
存储 缓存 Java
Java 中的阻塞队列
Java 中的阻塞队列
15 0
|
3月前
|
存储 安全 Java
Java多线程基础-9:代码案例之阻塞队列(二)
Java多线程基础教程系列中,介绍了如何实现一个简单的阻塞队列(非泛型版本)。
30 0
|
3月前
|
消息中间件 存储 负载均衡
Java多线程基础-9:代码案例之阻塞队列(一)
阻塞队列是一种遵循先进先出原则的线程安全数据结构,它在队列满时会阻塞入队操作,队列空时会阻塞出队操作,常用于多线程间的协作,简化同步代码编写。Java中提供了`BlockingQueue`接口及其实现类,如`ArrayBlockingQueue`和`LinkedBlockingQueue`,用于实现生产者-消费者模型,以实现负载均衡和资源的有效利用,如削峰填谷,降低系统压力。
43 0
|
3月前
|
设计模式 消息中间件 安全
【Java多线程】关于多线程的一些案例 —— 单例模式中的饿汉模式和懒汉模式以及阻塞队列
【Java多线程】关于多线程的一些案例 —— 单例模式中的饿汉模式和懒汉模式以及阻塞队列
36 0
|
3月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解08-阻塞队列之LinkedBlockingDeque
**摘要:** 本文分析了Java中的LinkedBlockingDeque,它是一个基于链表实现的双端阻塞队列,具有并发安全性。LinkedBlockingDeque可以作为有界队列使用,容量由构造函数指定,默认为Integer.MAX_VALUE。队列操作包括在头部和尾部的插入与删除,这些操作由锁和Condition来保证线程安全。例如,`linkFirst()`和`linkLast()`用于在队首和队尾插入元素,而`unlinkFirst()`和`unlinkLast()`则用于删除首尾元素。队列的插入和删除方法根据队列是否满或空,可能会阻塞或唤醒等待的线程,这些操作通过`notFul
283 5
|
3月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue
`LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。 `xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹
270 5
|
3月前
|
存储 缓存 Java
Java线程池ThreadPoolExcutor源码解读详解06-阻塞队列之SynchronousQueue
SynchronousQueue 是 Java 中的一个特殊阻塞队列,它没有容量,实现线程间的直接对象交换。这个队列的特点和优缺点如下: 1. **无容量限制**:SynchronousQueue 不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。这意味着生产者和消费者必须严格同步。 2. **阻塞性质**:当一个线程试图插入元素时,如果没有线程正在等待获取,那么插入操作会阻塞;同样,尝试获取元素的线程如果没有元素可取,也会被阻塞。 3. **公平与非公平策略**:SynchronousQueue 支持公平和非公平的线程调度策略。公平模式下,等待时间最长的线程优先
65 5