Java阻塞队列实现原理分析

简介:

Java 阻塞队列实现原理分析


Java中的阻塞队列接口BlockingQueue继承自Queue接口。

BlockingQueue接口提供了3个添加元素方法:

  • add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常;
  • offer:添加元素到队列里,添加成功返回true,添加失败返回false;
  • put:添加元素到队列里,如果容量满了会阻塞直到容量不满。

3个删除方法:

  • poll:删除队列头部元素,如果队列为空,返回null。否则返回元素;
  • remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false;
  • take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除。

常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

本文以ArrayBlockingQueue和LinkedBlockingQueue为例,分析它们的实现原理。

ArrayBlockingQueue

ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。

ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。

它带有的属性如下:


 
 
  1. // 存储队列元素的数组,是个循环数组 
  2.  
  3. final Object[] items; 
  4.  
  5.   
  6.  
  7. // 拿数据的索引,用于take,poll,peek,remove方法 
  8.  
  9. int takeIndex; 
  10.  
  11.   
  12.  
  13. // 放数据的索引,用于put,offer,add方法 
  14.  
  15. int putIndex; 
  16.  
  17.   
  18.  
  19. // 元素个数 
  20.  
  21. int count
  22.  
  23.   
  24.  
  25. // 可重入锁 
  26.  
  27. final ReentrantLock lock; 
  28.  
  29. // notEmpty条件对象,由lock创建 
  30.  
  31. private final Condition notEmpty; 
  32.  
  33. // notFull条件对象,由lock创建 
  34.  
  35. private final Condition notFull;  

数据的添加

ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

add方法:


 
 
  1. public boolean add(E e) { 
  2.  
  3.     if (offer(e)) 
  4.  
  5.         return true
  6.  
  7.     else 
  8.  
  9.         throw new IllegalStateException("Queue full"); 
  10.  
  11. }  

add方法内部调用offer方法如下:


 
 
  1. public boolean offer(E e) { 
  2.  
  3.     checkNotNull(e); // 不允许元素为空 
  4.  
  5.     final ReentrantLock lock = this.lock; 
  6.  
  7.     lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程 
  8.  
  9.     try { 
  10.  
  11.         if (count == items.length) // 如果队列已满 
  12.  
  13.             return false; // 直接返回false,添加失败 
  14.  
  15.         else { 
  16.  
  17.             insert(e); // 数组没满的话调用insert方法 
  18.  
  19.             return true; // 返回true,添加成功 
  20.  
  21.         } 
  22.  
  23.     } finally { 
  24.  
  25.         lock.unlock(); // 释放锁,让其他线程可以调用offer方法 
  26.  
  27.     } 
  28.  
  29. }  

insert方法如下:


 
 
  1. private void insert(E x) { 
  2.  
  3.     items[putIndex] = x; // 元素添加到数组里 
  4.  
  5.     putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0 
  6.  
  7.     ++count; // 元素个数+1 
  8.  
  9.     notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知 
  10.  
  11. }  

put方法:


 
 
  1. public void put(E e) throws InterruptedException { 
  2.  
  3.     checkNotNull(e); // 不允许元素为空 
  4.  
  5.     final ReentrantLock lock = this.lock; 
  6.  
  7.     lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程 
  8.  
  9.     try { 
  10.  
  11.         while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里 
  12.  
  13.             notFull.await(); // 线程阻塞并被挂起,同时释放锁 
  14.  
  15.         insert(e); // 调用insert方法 
  16.  
  17.     } finally { 
  18.  
  19.         lock.unlock(); // 释放锁,让其他线程可以调用put方法 
  20.  
  21.     } 
  22.  
  23. }  

ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

offer方法如果队列满了,返回false,否则返回true

add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

这3个方法内部都会使用可重入锁保证原子性。

数据的删除

ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

poll方法:


 
 
  1. public E poll() { 
  2.  
  3.     final ReentrantLock lock = this.lock; 
  4.  
  5.     lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程 
  6.  
  7.     try { 
  8.  
  9.         return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法 
  10.  
  11.     } finally { 
  12.  
  13.         lock.unlock(); // 释放锁,让其他线程可以调用poll方法 
  14.  
  15.     } 
  16.  
  17. }  

poll方法内部调用extract方法:


 
 
  1. private E extract() { 
  2.  
  3.     final Object[] items = this.items; 
  4.  
  5.     E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素 
  6.  
  7.     items[takeIndex] = null; // 对应取索引上的数据清空 
  8.  
  9.     takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0 
  10.  
  11.     --count; // 元素个数-1 
  12.  
  13.     notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知 
  14.  
  15.     return x; // 返回元素 
  16.  
  17. }  

take方法:


 
 
  1. public E take() throws InterruptedException { 
  2.  
  3.     final ReentrantLock lock = this.lock; 
  4.  
  5.     lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程 
  6.  
  7.     try { 
  8.  
  9.         while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里 
  10.  
  11.             notEmpty.await(); // 线程阻塞并被挂起,同时释放锁 
  12.  
  13.         return extract(); // 调用extract方法 
  14.  
  15.     } finally { 
  16.  
  17.         lock.unlock(); // 释放锁,让其他线程可以调用take方法 
  18.  
  19.     } 
  20.  
  21. }  

remove方法:


 
 
  1. public boolean remove(Object o) { 
  2.  
  3.     if (o == nullreturn false
  4.  
  5.     final Object[] items = this.items; 
  6.  
  7.     final ReentrantLock lock = this.lock; 
  8.  
  9.     lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程 
  10.  
  11.     try { 
  12.  
  13.         for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素 
  14.  
  15.             if (o.equals(items[i])) { // 两个对象相等的话 
  16.  
  17.                 removeAt(i); // 调用removeAt方法 
  18.  
  19.                 return true; // 删除成功,返回true 
  20.  
  21.             } 
  22.  
  23.         } 
  24.  
  25.         return false; // 删除成功,返回false 
  26.  
  27.     } finally { 
  28.  
  29.         lock.unlock(); // 释放锁,让其他线程可以调用remove方法 
  30.  
  31.     } 
  32.  
  33. }  

removeAt方法:


 
 
  1. void removeAt(int i) { 
  2.  
  3.     final Object[] items = this.items; 
  4.  
  5.     if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可 
  6.  
  7.         items[takeIndex] = null
  8.  
  9.         takeIndex = inc(takeIndex); 
  10.  
  11.     } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值 
  12.  
  13.         for (;;) { 
  14.  
  15.             int nexti = inc(i); 
  16.  
  17.             if (nexti != putIndex) { 
  18.  
  19.                 items[i] = items[nexti]; 
  20.  
  21.                 i = nexti; 
  22.  
  23.             } else { 
  24.  
  25.                 items[i] = null
  26.  
  27.                 putIndex = i; 
  28.  
  29.                 break; 
  30.  
  31.             } 
  32.  
  33.         } 
  34.  
  35.     } 
  36.  
  37.     --count; // 元素个数-1 
  38.  
  39.     notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知  
  40.  
  41. }  

ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:

poll方法对于队列为空的情况,返回null,否则返回队列头部元素。

remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。

poll方法和remove方法不会阻塞线程。

take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。

这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

LinkedBlockingQueue

LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

内部使用放锁和拿锁,这两个锁实现阻塞(“two lock queue” algorithm)。

它带有的属性如下:


 
 
  1. // 容量大小 
  2.  
  3. private final int capacity; 
  4.  
  5.   
  6.  
  7. // 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger 
  8.  
  9. private final AtomicInteger count = new AtomicInteger(0); 
  10.  
  11.   
  12.  
  13. // 头结点 
  14.  
  15. private transient Node<E> head; 
  16.  
  17.   
  18.  
  19. // 尾节点 
  20.  
  21. private transient Node<E> last
  22.  
  23.   
  24.  
  25. // 拿锁 
  26.  
  27. private final ReentrantLock takeLock = new ReentrantLock(); 
  28.  
  29.   
  30.  
  31. // 拿锁的条件对象 
  32.  
  33. private final Condition notEmpty = takeLock.newCondition(); 
  34.  
  35.   
  36.  
  37. // 放锁 
  38.  
  39. private final ReentrantLock putLock = new ReentrantLock(); 
  40.  
  41.   
  42.  
  43. // 放锁的条件对象 
  44.  
  45. private final Condition notFull = putLock.newCondition();  

ArrayBlockingQueue只有1个锁,添加数据和删除数据的时候只能有1个被执行,不允许并行执行。

而LinkedBlockingQueue有2个锁,放锁和拿锁,添加数据和删除数据是可以并行进行的,当然添加数据和删除数据的时候只能有1个线程各自执行。

数据的添加

LinkedBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

add方法内部调用offer方法:


 
 
  1. public boolean offer(E e) { 
  2.  
  3.     if (e == null) throw new NullPointerException(); // 不允许空元素 
  4.  
  5.     final AtomicInteger count = this.count
  6.  
  7.     if (count.get() == capacity) // 如果容量满了,返回false 
  8.  
  9.         return false
  10.  
  11.     int c = -1; 
  12.  
  13.     Node<E> node = new Node(e); // 容量没满,以新元素构造节点 
  14.  
  15.     final ReentrantLock putLock = this.putLock; 
  16.  
  17.     putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程 
  18.  
  19.     try { 
  20.  
  21.         if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行 
  22.  
  23.             enqueue(node); // 节点添加到链表尾部 
  24.  
  25.             c = count.getAndIncrement(); // 元素个数+1 
  26.  
  27.             if (c + 1 < capacity) // 如果容量还没满 
  28.  
  29.                 notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 
  30.  
  31.         } 
  32.  
  33.     } finally { 
  34.  
  35.         putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法 
  36.  
  37.     } 
  38.  
  39.     if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 
  40.  
  41.         signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 
  42.  
  43.     return c >= 0; // 添加成功返回true,否则返回false 
  44.  
  45. }  

put方法:


 
 
  1. public void put(E e) throws InterruptedException { 
  2.  
  3.     if (e == null) throw new NullPointerException(); // 不允许空元素 
  4.  
  5.     int c = -1; 
  6.  
  7.     Node<E> node = new Node(e); // 以新元素构造节点 
  8.  
  9.     final ReentrantLock putLock = this.putLock; 
  10.  
  11.     final AtomicInteger count = this.count
  12.  
  13.     putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程 
  14.  
  15.     try { 
  16.  
  17.         while (count.get() == capacity) { // 如果容量满了 
  18.  
  19.             notFull.await(); // 阻塞并挂起当前线程 
  20.  
  21.         } 
  22.  
  23.         enqueue(node); // 节点添加到链表尾部 
  24.  
  25.         c = count.getAndIncrement(); // 元素个数+1 
  26.  
  27.         if (c + 1 < capacity) // 如果容量还没满 
  28.  
  29.             notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 
  30.  
  31.     } finally { 
  32.  
  33.         putLock.unlock(); // 释放放锁,让其他线程可以调用put方法 
  34.  
  35.     } 
  36.  
  37.     if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 
  38.  
  39.         signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 
  40.  
  41. }  

LinkedBlockingQueue的添加数据方法add,put,offer跟ArrayBlockingQueue一样,不同的是它们的底层实现不一样。

ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒。

而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程。

数据的删除

LinkedBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

poll方法:


 
 
  1. public E poll() { 
  2.  
  3.     final AtomicInteger count = this.count
  4.  
  5.     if (count.get() == 0) // 如果元素个数为0 
  6.  
  7.         return null; // 返回null 
  8.  
  9.     E x = null
  10.  
  11.     int c = -1; 
  12.  
  13.     final ReentrantLock takeLock = this.takeLock; 
  14.  
  15.     takeLock.lock(); // 拿锁加锁,保证调用poll方法的时候只有1个线程 
  16.  
  17.     try { 
  18.  
  19.         if (count.get() > 0) { // 判断队列里是否还有数据 
  20.  
  21.             x = dequeue(); // 删除头结点 
  22.  
  23.             c = count.getAndDecrement(); // 元素个数-1 
  24.  
  25.             if (c > 1) // 如果队列里还有元素 
  26.  
  27.                 notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费 
  28.  
  29.         } 
  30.  
  31.     } finally { 
  32.  
  33.         takeLock.unlock(); // 释放拿锁,让其他线程可以调用poll方法 
  34.  
  35.     } 
  36.  
  37.     if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据 
  38.  
  39.         signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据 
  40.  
  41.                 return x; 
  42.  

take方法:


 
 
  1. public E take() throws InterruptedException { 
  2.  
  3.     E x; 
  4.  
  5.     int c = -1; 
  6.  
  7.     final AtomicInteger count = this.count
  8.  
  9.     final ReentrantLock takeLock = this.takeLock; 
  10.  
  11.     takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程 
  12.  
  13.     try { 
  14.  
  15.         while (count.get() == 0) { // 如果队列里已经没有元素了 
  16.  
  17.             notEmpty.await(); // 阻塞并挂起当前线程 
  18.  
  19.         } 
  20.  
  21.         x = dequeue(); // 删除头结点 
  22.  
  23.         c = count.getAndDecrement(); // 元素个数-1 
  24.  
  25.         if (c > 1) // 如果队列里还有元素 
  26.  
  27.             notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费 
  28.  
  29.     } finally { 
  30.  
  31.         takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法 
  32.  
  33.     } 
  34.  
  35.     if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据 
  36.  
  37.         signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据 
  38.  
  39.     return x; 
  40.  

remove方法:


 
 
  1. public boolean remove(Object o) { 
  2.  
  3.     if (o == nullreturn false
  4.  
  5.     fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁 
  6.  
  7.     try { 
  8.  
  9.         for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历 
  10.  
  11.              p != null
  12.  
  13.              trail = p, p = p.next) { 
  14.  
  15.             if (o.equals(p.item)) { // 判断是否找到对象 
  16.  
  17.                 unlink(p, trail); // 修改节点的链接信息,同时调用notFull的signal方法 
  18.  
  19.                 return true
  20.  
  21.             } 
  22.  
  23.         } 
  24.  
  25.         return false
  26.  
  27.     } finally { 
  28.  
  29.         fullyUnlock(); // 2个锁解锁 
  30.  
  31.     } 
  32.  

LinkedBlockingQueue的take方法对于没数据的情况下会阻塞,poll方法删除链表头结点,remove方法删除指定的对象。

需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。


作者:佚名

来源:51CTO

相关文章
|
2月前
|
存储 Java
【编程基础知识】 分析学生成绩:用Java二维数组存储与输出
本文介绍如何使用Java二维数组存储和处理多个学生的各科成绩,包括成绩的输入、存储及格式化输出,适合初学者实践Java基础知识。
94 1
|
2天前
|
缓存 算法 搜索推荐
Java中的算法优化与复杂度分析
在Java开发中,理解和优化算法的时间复杂度和空间复杂度是提升程序性能的关键。通过合理选择数据结构、避免重复计算、应用分治法等策略,可以显著提高算法效率。在实际开发中,应该根据具体需求和场景,选择合适的优化方法,从而编写出高效、可靠的代码。
15 6
|
21天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
25天前
|
监控 算法 Java
jvm-48-java 变更导致压测应用性能下降,如何分析定位原因?
【11月更文挑战第17天】当JVM相关变更导致压测应用性能下降时,可通过检查变更内容(如JVM参数、Java版本、代码变更)、收集性能监控数据(使用JVM监控工具、应用性能监控工具、系统资源监控)、分析垃圾回收情况(GC日志分析、内存泄漏检查)、分析线程和锁(线程状态分析、锁竞争分析)及分析代码执行路径(使用代码性能分析工具、代码审查)等步骤来定位和解决问题。
|
1月前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
66 2
|
1月前
|
Java 关系型数据库 数据库
面向对象设计原则在Java中的实现与案例分析
【10月更文挑战第25天】本文通过Java语言的具体实现和案例分析,详细介绍了面向对象设计的五大核心原则:单一职责原则、开闭原则、里氏替换原则、接口隔离原则和依赖倒置原则。这些原则帮助开发者构建更加灵活、可维护和可扩展的系统,不仅适用于Java,也适用于其他面向对象编程语言。
38 2
|
2月前
|
Java
让星星⭐月亮告诉你,Java synchronized(*.class) synchronized 方法 synchronized(this)分析
本文通过Java代码示例,介绍了`synchronized`关键字在类和实例方法上的使用。总结了三种情况:1) 类级别的锁,多个实例对象在同一时刻只能有一个获取锁;2) 实例方法级别的锁,多个实例对象可以同时执行;3) 同一实例对象的多个线程,同一时刻只能有一个线程执行同步方法。
22 1
|
2月前
|
小程序 Oracle Java
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用
这篇文章是关于JVM基础知识的介绍,包括JVM的跨平台和跨语言特性、Class文件格式的详细解析,以及如何使用javap和jclasslib工具来分析Class文件。
59 0
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用
|
2月前
|
Java
如何从Java字节码角度分析问题|8月更文挑战
如何从Java字节码角度分析问题|8月更文挑战
|
2月前
|
存储 Java 编译器
[Java]基本数据类型与引用类型赋值的底层分析
本文详细分析了Java中不同类型引用的存储方式,包括int、Integer、int[]、Integer[]等,并探讨了byte与其他类型间的转换及String的相关特性。文章通过多个示例解释了引用和对象的存储位置,以及字符串常量池的使用。此外,还对比了String和StringBuilder的性能差异,帮助读者深入理解Java内存管理机制。
28 0