BlockingQueue(阻塞队列)基本使用指南

简介: BlockingQueue(阻塞队列)基本使用指南

概述


BlockingQueue 是 java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类。


BlockingQueue 的特性是在任意时刻只有一个线程可以进行 take 或者 put 操作,并且 BlockingQueue 提供了超时 return null 的机制,在许多生产场景里都可以看到这个工具的身影。


BlockingQueue是一个接口,它的实现类有 ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、 LinkedBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等,它们的区别主要体现在存储结构上或对元素 操作上的不同,但是对于 take 与 put 操作的原理,却是类似的。



队列类型

  • 无限队列(unbounded queue ) - 几乎可以无限增长
  • 有限队列(bounded queue ) - 定义了最大容量



队列数据结构

队列实质就是一种存储数据的结构

  • 通常用链表或者数组实现
  • 一般而言队列具备先进先出的特性,当然也有双端队列(Deque)优先级队列
  • 主要操作:入队(EnQueue)与出队(Dequeue)



0241e632105a40f6a84cbf3300d96d07.jpg


BlockingQueue API


参考:阻塞队列BlockingQueue(JDK8)

继承关系:BlockingQueue extends Queue extends Collection



BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

  • 添加元素 的常用方法:
// 将指定的元素插入到队列的尾部,如果队列满了,那么会阻塞,直到队列中有元素被取出。注:不允许添加null元素
void put(E e)
// 如果插入成功则返回 true,若队列已满则抛出 IllegalStateException 异常  
boolean add(E e)
// 如果插入成功则返回 true,若队列已满则返回 false  
boolean offer(E e)
// 如果插入成功则返回 true,若队列已满则阻塞指定时间直到队列中有元素被取出,超时则返回 false
boolean offer(E e, long timeout, TimeUnit unit)


检索元素 的常用方法:

// 获取队列的头部元素并将其删除,在队列为空时会阻塞,直到队列中有元素可取  
E take()
// 检索并删除队列的头部,如果没有元素,则等待指定时间以使元素可用,如果超时,则返回 null
E poll(long timeout, TimeUnit unit)
// 一次性从 BlockingQueue 获取所有可用的数据对象(可以指定获取数据的个数)
// 注:可以提升获取数据效率;不需要多次分批加锁或释放锁
int drainTo(Collection<? super E> c)
int drainTo(Collection<? super E> c, int maxElements)


还可以使用 Collection(集合)接口中的方法,例如:

// 清空队列
void clear()
// 队列大小
int size()
// 可用大小
int remainingCapacity()




常见的阻塞队列



  • ArrayBlockingQueue :由数组支持的有界队列
  • LinkedBlockingQueue :由链接节点支持的可选有界队列
  • PriorityBlockingQueue :由优先级堆支持的无界优先级队列
  • DelayQueue :由优先级堆支持的、基于时间的调度队列



ArrayBlockingQueue


介绍


队列基于数组实现,容量大小在创建 ArrayBlockingQueue 对象时指定。底层数组一旦创建了,容量就不能改变了,因此 ArrayBlockingQueue 是一个容量限制的阻塞队列。因此在队列满的时候执行入队会阻塞,在队列为空时出队也会阻塞。

数据结构如下图:



队列创建:BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();

**应用场景:**在线程池中有比较多的应用,生产者消费者场景

**工作原理:**基于 ReentrantLock 保证线程安全,根据 Condition 实现队列满时的阻塞

主要变量:


// 元素数组,其长度在构造方法中指定
final Object[] items;
// 队列中实际的元素数量
int count;
// 保护所有通道的主锁
final ReentrantLock lock;
// 等待take条件的队列
private final Condition notEmpty;
// 等待put条件的队列
private final Condition notFull;



方法解析

  • put(E e) 方法
public void put(E e) throws InterruptedException {
    // 检查元素是否为null,如果是,抛出NullPointerException
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // 当队里中的元素数量等于数组长度,则队列已满,阻塞,等待队列成为不满状态
        while (count == items.length)
            notFull.await();
        // 将元素入队
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}


put 方法总结:

   ArrayBlockingQueue 不允许添加 null 元素;

   ArrayBlockingQueue 在队列已满的时候,会调用 notFull.await(),释放锁并处于阻塞状态;

   一旦 ArrayBlockingQueue 在队列不满的时候,就立即入队。


E take() 方法


public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // 当队列中元素数量为0时,则进入阻塞状态
        while (count == 0)
            notEmpty.await();
        // 队列不为空是,调用dequeue()出队
        return dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}


  • take 方法总结:
  • 取元素时,若队列为空,则调用 notEmpty.await(),进入阻塞状态,直至不为空时,调用 dequeue() 方法出队。

LinkedBlockingQueue

介绍


是一个基于链表的无界队列(理论上有界),向无限队列添加元素的所有操作正常情况下不会阻塞,因此它可以增长到非常大的容量。


   队列创建:BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

   blockingQueue 如果在初始化时没有指定容量,那么容量默认为 Integer.MAX_VALUE

   注意:使用无限 BlockingQueue 设计生产者 - 消费者模型时,消费者需要能够像生产者向队列添加消息一样快地消费消息。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。


   底层数据结构

   LinkedBlockingQueue 内部是使用链表实现一个队列的,但是有别于一般的队列,在于该队列至少是有一个节点的,头节点不含有元素。


   如果队列为空时,头节点的 next 参数为null,尾节点的 next 参数也为null

   主要变量


// 容量限制,如果没有指定,则为 Integer.MAX_VALUE
private final int capacity;
// 当前队列中的元素数量
// count只能在两个地方变化,一个是入队的方法(进行+1操作),另一个是出队的方法(进行-1操作),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。
private final AtomicInteger count = new AtomicInteger();
// 队列头节点,始终满足head.item == null
transient Node<E> head;
// 队列的尾节点,始终满足last.next == null
private transient Node<E> last;
// 由 take、poll 等持有的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列为空时,保存执行出队的线程
private final Condition notEmpty = takeLock.newCondition();
// 由 put、offer 等持有的锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列满时,保存执行入队的线程
private final Condition notFull = putLock.newCondition();


方法解析

  • put(E e) 方法
// 在此队列的尾部插入指定元素,如有必要,等待空间可用。
public void put(E e) throws InterruptedException {
    // 不允许元素为null
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 以当前元素新建一个节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获得入队的锁
    putLock.lockInterruptibly();
    try {
        // 如果队列已满,那么将该线程加入到Condition的等待队列中
        while (count.get() == capacity) {
            notFull.await();
        }
        // 当队列未满,然后有出队线程取出导致,将节点入队
        enqueue(node);
        // 得到插入之前队列的元素个数。getAndIncrement返回的是 +1 前的值
        c = count.getAndIncrement();
        // 如果还可以插入元素,那么释放等待的入队线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放入队的锁
        putLock.unlock();
    }
    // 如果插入队列之前元素个数为0,插入后就通知出队线程队列非空
    if (c == 0)
        signalNotEmpty();
}


put 方法总结:


   LinkedBlockingQueue 不允许插入的元素为 null;


   同一时刻只有一个线程可以进行入队操作,putLock 在将元素插入队列尾部时加锁了;


   如果队列满了,则会调用 notFull.await(),将该线程加入到 Condition 等待队列中。await 方法会释放线程占有的锁,这将导致之前由于被阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为队列仍旧是满的,也被进入到条件队列中;


   一旦有出队线程取走元素,就会通知到入队等待队列释放线程。那么第一个加入到 Condition 队列中的将会被释放,那么该线程将会重新获得 put 锁,继而执行 enqueue() 方法,将节点插入到队列的尾部;


   然后得到插入队列前元素的个数,如果插入后队列中还可以继续插入元素,那么就通知 notFull 条件的等待队列中的线程;


   如果插入队列前个数为 0,那现在插入后,就为 1 了,那就可以通知因为队列为空而导致阻塞的出队线程去取元素了。


E take() 方法


public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列中元素数量为0,则将出队线程加入到notEmpty队列中进行等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 得到到队头的元素
        x = dequeue();
        // 得到出队列前元素的个数。getAndDecrement返回的是 -1 前的值
        c = count.getAndDecrement();
        // 如果出队列前的元素数量大于1,那说明还可以继续取,那就释放在notEmpty队列的第一个线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放出队锁
        takeLock.unlock();
    }
    // 如果出队前队列是满的,那现在取走一个元素了,队列就不满了,就可以去通知等待中的入队线程了。
    if (c == capacity)
        signalNotFull();
    return x;
}


take方法总结:


   同一时刻只有一个线程可以进行出队操作,takeLock 在出队之前加锁了;


   如果队列中元素为空,那就进入 notEmpty 队列中进行等待。直到队列不为空时,得到队列中的第一个元素。当发现取完发现还有元素可取时,再通知一下 notEmpty 队列中等待的其他线程。最后判断自己取元素前的是不是满的,如果是满的,那自己取完,就不满了,就可以通知在 notFull 队列中等待插入的线程进行 put 了。


remove() 方法


用于删除队列中一个元素,如果队列中不含有该元素,那么返回 false ;有的话则删除并返回true。

入队和出队都是只获取一个锁,而 remove()方法需要同时获得两把锁。

public boolean remove(Object o) {
    // 因为队列不包含null元素,返回false
    if (o == null) return false;
    // 获取两把锁
    fullyLock();
    try {
        // 从头的下一个节点开始遍历
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果匹配,那么将节点从队列中移除,trail表示需要删除节点的前一节点
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 释放两把锁
        fullyUnlock();
    }
}
/**
 * 锁定以防止 put 和 take.
 */
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
/**
 * 解锁以允许 put 和 take.
 */
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}


DelayQueue


由优先级堆支持的、基于时间的调度队列,内部基于无界队列 PriorityQueue 实现,而无界队列基于数组的扩容实现。


   队列创建:BlockingQueue<String> blockingQueue = new DelayQueue();

   要求:入队的对象必须要实现 Delayed 接口,而 Delayed 集成自 Comparable 接口

   应用场景:电影票

   工作原理:队列内部会根据时间优先级进行排序。延迟类线程池周期执行。




LinkedBlockingQueue 和 ArrayBlockingQueue 的区别


  • 底层实现不同


LinkedBlockingQueue 底层实现是链表,ArrayBlockingQueue 底层实现是数组


队列容量

LinkedBlockingQueue 默认的队列长度是 Integer.Max,但是可以指定容量。在入队与出队都高并发的情况下,性能比ArrayBlockingQueue 高很多;


ArrayBlockingQueue 必须在构造方法中指定队列长度,不可变。在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高。


锁的数量

LinkedBlockingQueue 有两把锁,可以有两个线程同时进行入队和出队操作,但同时只能有一个线程进行入队或出队操作。


ArrayBlockingQueue 只有一把锁,同时只能有一个线程进行入队和出队操作。

相关文章
|
前端开发 Java 开发者
深入理解Spring Boot中的@Service注解
【4月更文挑战第22天】在 Spring Boot 应用开发中,@Service 注解扮演着特定的角色,主要用于标识服务层组件。本篇技术博客将全面探讨 @Service 注解的概念,并提供实际的应用示例,帮助开发者理解如何有效地使用这一注解来优化应用的服务层架构
2711 1
|
数据采集 前端开发 JavaScript
查看Socket断开原因及加入心跳机制防止自动断开连接
一般情况下,前端页面连接WebSocket服务的时候都是通过Nginx等负载均衡,然后由Nginx去代理连接后端的socket服务。如果建立连接之后不做一些措施,那么可能会有各种各样的原因会导致socket断开。
2874 0
|
API 开发者
工作日和节假日api
节假日api核心服务托管在阿里云之上,API天然分布式、高可用。
|
10月前
|
SQL 关系型数据库 MySQL
深入解析MySQL的EXPLAIN:指标详解与索引优化
MySQL 中的 `EXPLAIN` 语句用于分析和优化 SQL 查询,帮助你了解查询优化器的执行计划。本文详细介绍了 `EXPLAIN` 输出的各项指标,如 `id`、`select_type`、`table`、`type`、`key` 等,并提供了如何利用这些指标优化索引结构和 SQL 语句的具体方法。通过实战案例,展示了如何通过创建合适索引和调整查询语句来提升查询性能。
1890 10
|
11月前
|
Java Scala Kotlin
SpringBoot 读取配置的几种方式
本文介绍了SpringBoot中读取配置文件的几种方法,包括使用`@Value`、`Environment`和`@ConfigurationProperties`注解,以及如何通过`@PropertySource`指定配置文件位置和编码。还讲解了如何自定义工厂类以支持读取`.yaml`文件。
351 0
|
12月前
|
缓存 easyexcel Java
Java EasyExcel 导出报内存溢出如何解决
大家好,我是V哥。使用EasyExcel进行大数据量导出时容易导致内存溢出,特别是在导出百万级别的数据时。以下是V哥整理的解决该问题的一些常见方法,包括分批写入、设置合适的JVM内存、减少数据对象的复杂性、关闭自动列宽设置、使用Stream导出以及选择合适的数据导出工具。此外,还介绍了使用Apache POI的SXSSFWorkbook实现百万级别数据量的导出案例,帮助大家更好地应对大数据导出的挑战。欢迎一起讨论!
1308 1
|
10月前
|
运维 监控 Java
为何内存不够用?微服务改造启动多个Spring Boot的陷阱与解决方案
本文记录并复盘了生产环境中Spring Boot应用内存占用过高的问题及解决过程。系统上线初期运行正常,但随着业务量上升,多个Spring Boot应用共占用了64G内存中的大部分,导致应用假死。通过jps和jmap工具排查发现,原因是运维人员未设置JVM参数,导致默认配置下每个应用占用近12G内存。最终通过调整JVM参数、优化堆内存大小等措施解决了问题。建议在生产环境中合理设置JVM参数,避免资源浪费和性能问题。
600 3
|
存储 安全 Java
深入探索Java并发编程:ArrayBlockingQueue详解
深入探索Java并发编程:ArrayBlockingQueue详解
|
网络协议 Java
JAVA实现心跳检测【长连接】
这篇文章介绍了Java中实现心跳检测机制的方法,包括心跳机制的简介、实现方式、客户端和服务端的代码实现,以及具体的测试结果。文中详细阐述了如何通过自定义心跳包和超时检测来维持长连接,并提供了完整的客户端和服务端示例代码。
JAVA实现心跳检测【长连接】
|
11月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!