JAVA并发之阻塞队列浅析

简介: JAVA并发之阻塞队列浅析背景因为在工作中经常会用到阻塞队列,有的时候还要根据业务场景获取重写阻塞队列中的方法,所以学习一下阻塞队列的实现原理还是很有必要的。(PS:不深入了解的话,很容易使用出错,造成没有技术深度的样子)阻塞队列是什么?要想了解阻塞队列,先了解一下队列是啥,简单的说队列就是一种先进先出的数据结构。

JAVA并发之阻塞队列浅析
背景
因为在工作中经常会用到阻塞队列,有的时候还要根据业务场景获取重写阻塞队列中的方法,所以学习一下阻塞队列的实现原理还是很有必要的。(PS:不深入了解的话,很容易使用出错,造成没有技术深度的样子)

阻塞队列是什么?
要想了解阻塞队列,先了解一下队列是啥,简单的说队列就是一种先进先出的数据结构。(具体的内容去数据结构里学习一下)所以阻塞队列就是一种可阻塞的队列。和普通的队列的不同就体现在 ”阻塞“两个字上。阻塞是啥意思?

百度看一下

在软件工程里阻塞一般指的是阻塞调用,即调用结果返回之前,当前线程会被挂起。函数只有在得到结果之后才会返回。

阻塞队列其实就是普通的队列根据需要将某些方法改为阻塞调用。所以阻塞队里和普通队里的不同主要体现在两个方面

当队列是空的时,从队列中获取元素的操作将会被阻塞 。直到其他的线程往空的队列插入新的元素
当队列是满时,往队列里添加元素的操作会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列
为什么要使用阻塞队列?

那么为什么要使用阻塞队列?阻塞队列又能完成什么特殊的任务吗?

阻塞队列的经典使用 场景就是“生产者”和“消费者”模型,生产者生产数据,放入队列,然后消费从队列中获取数据,这个在一般情况下自然没有问题,但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?

    在出现消费者速度远大于生产者速度,消费者在数据消费至一定程度的情况下,暂停等待一下(阻塞消费者)来等待生产者,以保证生产者能够生产出新的数据;反之亦然。

阻塞队列在java中的一种典型使用场景是线程池,在线程池中,当提交的任务不能被立即得到执行的时候,线程池就会将提交的任务放到一个阻塞的任务队列中来(线程池的具体使用参见之前写的一篇文章《java并发之线程池的浅析》)

然而,在阻塞队列发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。在这里要感谢一下concurrent包,减轻了我们很多工作

阻塞队列的成员有哪些

下面分别简单介绍一下:

ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。构造时必须传入的参数是数组大小此外还可以指定是否公平性。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】;在插入或删除元素时不会产生或销毁任何额外的对象实例

LinkedBlockingQueue:一个由链表结构组成的有界队列,照先进先出的顺序进行排序 ,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE。。【PS:如果生产者的速度远远大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。】PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
LinkedBlockingQueue之所以能够高效的处理并发数据,是因为take()方法和put(E param)方法使用了不同的可重入锁,分别为private final ReentrantLock putLock和private final ReentrantLock takeLock,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
LinkedBlockingQueue在插入元素是会创建一个额外的Node对象,所以它这在长时间内需要高效并发地处理大批量数据的系统中,对于GC的还是存在一定的影响。
DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

阻塞队列的核心方法
阻塞对队列的核心方法主要是插入操作操作和取出操作,如下

Throws Exception 类型的插入和取出在不能立即被执行的时候就会抛出异常。
Special Value 类型的插入和取出在不能被立即执行的情况下会返回一个特殊的值(true 或者 false 或者null)
Blocked 类型的插入和取出操作在不能被立即执行的时候会阻塞线程直到可以操作的时候会被其他线程唤醒
Timed out 类型的插入和取出操作在不能立即执行的时候会被阻塞一定的时候,如果在指定的时间内没有被执行,那么会返回一个特殊值
插入操作
boolean offer(E e):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。(本方法不阻塞当前执行方法的线程)。      
boolean offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在设置的指定的时间内,还不能往队列中加入BlockingQueue,则返回false。
void put(E paramE) throws InterruptedException:将指定元素插入到此队列中里,如果队列没有空间,则调用此方法的线程被阻断直到队列里里面有空间再继续执行插入操作。
public boolean add(E e): 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException(其实就是调用了offer方法)。
复制代码
public boolean add(E e) {

    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");

}
复制代码
获取操作
poll():取走BlockingQueue里排在首位的对象,,取不到时返回null;
poll(long timeout, TimeUnit unit):在指定时间内从BlockingQueue取出一个队首的对象,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回null。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo(Collection<? super E> c, int maxElements):一次性从BlockingQueue获取所有可用的数据对象,将数据对象加入传递的集合中(还可以通过maxElements指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁
阻塞队列的实现原理

前面介绍了非阻塞队列和阻塞队列中常用的方法,下面来探讨阻塞队列的实现原理,本文以比较常用的ArrayBlockingQueue为例,其他阻塞队列实现原理根据特性会和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

首先看一下ArrayBlockingQueue的几个关键成员变量

复制代码
public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable {

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;
/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

}
复制代码
从上边可以明显的看出ArrayBlockingQueue用一个数组来存储数据,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。 lock是一个可重入锁,notEmpty和notFull是等待条件。

然后看它的一个关键方法的实现:put()

复制代码
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

try {
    while (count == items.length)
        notFull.await();
    enqueue(e);
} finally {
  lock.unlock();
}

}
复制代码
首选检查元素是否为空,为空则抛出异常
接着实例化可重入锁
然后localReentrantLock.lockInterruptibly();这里特别强调一下 (lockInterruptibly()允许在等待时由其他线程的Thread.interrupt()方法来中断等待线程而直接返回,这时是不用获取锁的,而会抛出一个InterruptException。 而ReentrantLock.lock()方法则不允许Thread.interrupt()中断,即使检测到了Thread.interruptted一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功之后在把当前线程置为中断状态)
判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,即当队列满的时候,将会等待
将元素插入到队列中
解锁(这里一定要在finally中解锁啊!!!)
enqueue(E x)将元素插入到数组啊item中

复制代码
/**

 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

复制代码
该方法内部通过putIndex索引直接将元素添加到数组items中

这里思考一个问题 为什么当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0?

这是因为当队列是先进先出的 所以获取元素总是从队列头部获取,而添加元素从中从队列尾部获取。所以当队列索引(从0开始)与数组长度相等时,所以下次我们就需要从数组头部开始添加了;

最后当插入成功后,通过notEmpty唤醒正在等待取元素的线程

阻塞队列中和put对应的就是take了

下边是take方法的实现

复制代码
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

try {
      while (count == 0)
       notEmpty.await();
        return dequeue();
 finally {
    lock.unlock();
 }

}
复制代码
take方法其实很简单,队列中有数据就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程;

可以看到take的实现跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。(等的就是上文的put中的信号)当数组的数量为空时,也就是无任何数据可以被取出来的时候,notEmpty这个Condition就会进行阻塞,直到被notEmpty唤醒

dequeue的实现如下

复制代码
private E dequeue() {

    final Object[] items = this.items;
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

复制代码
take方法主要是从队列头部取元素,可以看到takeIndex是取元素的时候的偏移值,而put中是putIndex控制添加元素的偏移量,由此可见,put和take操作的偏移量分别是由putIndex和takeIndex控制的。其实仔细观察put和take的实现思路是有很多相似之处。

offer(E o, long timeout, TimeUnit unit)的实现方式其实和put的思想是差不多的区别是 offer在阻塞的时候调用的不是await()方法而是awaitNanos(long nanosTimeout) 带超时响应的等待(PS:具体区别可以参考我之前写的关于锁的博客《JAVA并发之锁的使用浅析》)
poll(long timeout, TimeUnit unit)的实现也是这样在take的基础上加了超时响应。感兴趣的朋友可以自行去看一下
案例分析
模拟食堂的经历,食堂窗口端出一道菜放在台面,然后等待顾客消费。写到代码里就是食堂窗口就是一个生产者线程,顾客就是消费者线程,台面就是阻塞队列。

  • View Code
    结果部分如下
 可以看到当生产者产生的数据达到阻塞队列的容量时,生成者线程会阻塞,等待消费者线程进行消费,上述案例中最大容量为8个盘子,所以当食堂做好了8个菜后了8会等待顾客进行消费,消费后继续生产。上述案例使用阻塞队列,看起来代码要简单得多,不需要再单独考虑同步和线程间通信的问题。

 在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。

 阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,如线程池中就使用了阻塞队列,其实只要符合生产者-消费者模型的都可以使用阻塞队列。

参考资料:

《Java编程思想》

https://www.cnblogs.com/dolphin0520/p/3932906.html

https://www.cnblogs.com/superfj/p/7757876.html
原文地址https://www.cnblogs.com/NathanYang/p/11276428.html

相关文章
|
3月前
|
安全 Java 编译器
揭秘JAVA深渊:那些让你头大的最晦涩知识点,从泛型迷思到并发陷阱,你敢挑战吗?
【8月更文挑战第22天】Java中的难点常隐藏在其高级特性中,如泛型与类型擦除、并发编程中的内存可见性及指令重排,以及反射与动态代理等。这些特性虽强大却也晦涩,要求开发者深入理解JVM运作机制及计算机底层细节。例如,泛型在编译时检查类型以增强安全性,但在运行时因类型擦除而丢失类型信息,可能导致类型安全问题。并发编程中,内存可见性和指令重排对同步机制提出更高要求,不当处理会导致数据不一致。反射与动态代理虽提供运行时行为定制能力,但也增加了复杂度和性能开销。掌握这些知识需深厚的技术底蕴和实践经验。
69 2
|
3月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
3月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
67 1
|
17天前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
22 1
|
2月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
3月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
49 5
|
3月前
|
存储 算法 Java
Java 中的同步集合和并发集合
【8月更文挑战第22天】
39 5
|
3月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
50 2
|
3月前
|
Java 开发者
【编程高手必备】Java多线程编程实战揭秘:解锁高效并发的秘密武器!
【8月更文挑战第22天】Java多线程编程是提升软件性能的关键技术,可通过继承`Thread`类或实现`Runnable`接口创建线程。为确保数据一致性,可采用`synchronized`关键字或`ReentrantLock`进行线程同步。此外,利用`wait()`和`notify()`方法实现线程间通信。预防死锁策略包括避免嵌套锁定、固定锁顺序及设置获取锁的超时。掌握这些技巧能有效增强程序的并发处理能力。
25 2
|
4月前
|
Java 开发者
Java中的多线程与并发控制
【7月更文挑战第31天】在Java的世界中,多线程是提升程序性能和响应能力的关键。本文将通过实际案例,深入探讨Java多线程的创建、同步机制以及并发包的使用,旨在帮助读者理解并掌握如何在Java中高效地实现多线程编程。
49 3