Java阻塞队列实现原理分析

简介:

Java 阻塞队列实现原理分析


Java中的阻塞队列接口BlockingQueue继承自Queue接口。

BlockingQueue接口提供了3个添加元素方法:

  • add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常;
  • offer:添加元素到队列里,添加成功返回true,添加失败返回false;
  • put:添加元素到队列里,如果容量满了会阻塞直到容量不满。

3个删除方法:

  • poll:删除队列头部元素,如果队列为空,返回null。否则返回元素;
  • remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false;
  • take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除。

常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

本文以ArrayBlockingQueue和LinkedBlockingQueue为例,分析它们的实现原理。

ArrayBlockingQueue

ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。

ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。

它带有的属性如下:

 
  1. // 存储队列元素的数组,是个循环数组 
  2.  
  3. final Object[] items; 
  4.  
  5.   
  6.  
  7. // 拿数据的索引,用于take,poll,peek,remove方法 
  8.  
  9. int takeIndex; 
  10.  
  11.   
  12.  
  13. // 放数据的索引,用于put,offer,add方法 
  14.  
  15. int putIndex; 
  16.  
  17.   
  18.  
  19. // 元素个数 
  20.  
  21. int count
  22.  
  23.   
  24.  
  25. // 可重入锁 
  26.  
  27. final ReentrantLock lock; 
  28.  
  29. // notEmpty条件对象,由lock创建 
  30.  
  31. private final Condition notEmpty; 
  32.  
  33. // notFull条件对象,由lock创建 
  34.  
  35. private final Condition notFull;  

数据的添加

ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

add方法:

 
  1. public boolean add(E e) { 
  2.  
  3.     if (offer(e)) 
  4.  
  5.         return true
  6.  
  7.     else 
  8.  
  9.         throw new IllegalStateException("Queue full"); 
  10.  
  11. }  

add方法内部调用offer方法如下:

 
  1. public boolean offer(E e) { 
  2.  
  3.     checkNotNull(e); // 不允许元素为空 
  4.  
  5.     final ReentrantLock lock = this.lock; 
  6.  
  7.     lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程 
  8.  
  9.     try { 
  10.  
  11.         if (count == items.length) // 如果队列已满 
  12.  
  13.             return false; // 直接返回false,添加失败 
  14.  
  15.         else { 
  16.  
  17.             insert(e); // 数组没满的话调用insert方法 
  18.  
  19.             return true; // 返回true,添加成功 
  20.  
  21.         } 
  22.  
  23.     } finally { 
  24.  
  25.         lock.unlock(); // 释放锁,让其他线程可以调用offer方法 
  26.  
  27.     } 
  28.  
  29. }  

insert方法如下:

 
  1. private void insert(E x) { 
  2.  
  3.     items[putIndex] = x; // 元素添加到数组里 
  4.  
  5.     putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0 
  6.  
  7.     ++count; // 元素个数+1 
  8.  
  9.     notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知 
  10.  
  11. }  

put方法:

 
  1. public void put(E e) throws InterruptedException { 
  2.  
  3.     checkNotNull(e); // 不允许元素为空 
  4.  
  5.     final ReentrantLock lock = this.lock; 
  6.  
  7.     lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程 
  8.  
  9.     try { 
  10.  
  11.         while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里 
  12.  
  13.             notFull.await(); // 线程阻塞并被挂起,同时释放锁 
  14.  
  15.         insert(e); // 调用insert方法 
  16.  
  17.     } finally { 
  18.  
  19.         lock.unlock(); // 释放锁,让其他线程可以调用put方法 
  20.  
  21.     } 
  22.  
  23. }  

ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

offer方法如果队列满了,返回false,否则返回true

add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

这3个方法内部都会使用可重入锁保证原子性。

数据的删除

ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

poll方法:

 
  1. public E poll() { 
  2.  
  3.     final ReentrantLock lock = this.lock; 
  4.  
  5.     lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程 
  6.  
  7.     try { 
  8.  
  9.         return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法 
  10.  
  11.     } finally { 
  12.  
  13.         lock.unlock(); // 释放锁,让其他线程可以调用poll方法 
  14.  
  15.     } 
  16.  
  17. }  

poll方法内部调用extract方法:

 
  1. private E extract() { 
  2.  
  3.     final Object[] items = this.items; 
  4.  
  5.     E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素 
  6.  
  7.     items[takeIndex] = null; // 对应取索引上的数据清空 
  8.  
  9.     takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0 
  10.  
  11.     --count; // 元素个数-1 
  12.  
  13.     notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知 
  14.  
  15.     return x; // 返回元素 
  16.  
  17. }  

take方法:

 
  1. public E take() throws InterruptedException { 
  2.  
  3.     final ReentrantLock lock = this.lock; 
  4.  
  5.     lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程 
  6.  
  7.     try { 
  8.  
  9.         while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里 
  10.  
  11.             notEmpty.await(); // 线程阻塞并被挂起,同时释放锁 
  12.  
  13.         return extract(); // 调用extract方法 
  14.  
  15.     } finally { 
  16.  
  17.         lock.unlock(); // 释放锁,让其他线程可以调用take方法 
  18.  
  19.     } 
  20.  
  21. }  

remove方法:

 
  1. public boolean remove(Object o) { 
  2.  
  3.     if (o == nullreturn false
  4.  
  5.     final Object[] items = this.items; 
  6.  
  7.     final ReentrantLock lock = this.lock; 
  8.  
  9.     lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程 
  10.  
  11.     try { 
  12.  
  13.         for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素 
  14.  
  15.             if (o.equals(items[i])) { // 两个对象相等的话 
  16.  
  17.                 removeAt(i); // 调用removeAt方法 
  18.  
  19.                 return true; // 删除成功,返回true 
  20.  
  21.             } 
  22.  
  23.         } 
  24.  
  25.         return false; // 删除成功,返回false 
  26.  
  27.     } finally { 
  28.  
  29.         lock.unlock(); // 释放锁,让其他线程可以调用remove方法 
  30.  
  31.     } 
  32.  
  33. }  

removeAt方法:

 
  1. void removeAt(int i) { 
  2.  
  3.     final Object[] items = this.items; 
  4.  
  5.     if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可 
  6.  
  7.         items[takeIndex] = null
  8.  
  9.         takeIndex = inc(takeIndex); 
  10.  
  11.     } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值 
  12.  
  13.         for (;;) { 
  14.  
  15.             int nexti = inc(i); 
  16.  
  17.             if (nexti != putIndex) { 
  18.  
  19.                 items[i] = items[nexti]; 
  20.  
  21.                 i = nexti; 
  22.  
  23.             } else { 
  24.  
  25.                 items[i] = null
  26.  
  27.                 putIndex = i; 
  28.  
  29.                 break; 
  30.  
  31.             } 
  32.  
  33.         } 
  34.  
  35.     } 
  36.  
  37.     --count; // 元素个数-1 
  38.  
  39.     notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知  
  40.  
  41. }  

ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:

poll方法对于队列为空的情况,返回null,否则返回队列头部元素。

remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。

poll方法和remove方法不会阻塞线程。

take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。

这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

LinkedBlockingQueue

LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

内部使用放锁和拿锁,这两个锁实现阻塞(“two lock queue” algorithm)。

它带有的属性如下:

 
  1. // 容量大小 
  2.  
  3. private final int capacity; 
  4.  
  5.   
  6.  
  7. // 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger 
  8.  
  9. private final AtomicInteger count = new AtomicInteger(0); 
  10.  
  11.   
  12.  
  13. // 头结点 
  14.  
  15. private transient Node<E> head; 
  16.  
  17.   
  18.  
  19. // 尾节点 
  20.  
  21. private transient Node<E> last
  22.  
  23.   
  24.  
  25. // 拿锁 
  26.  
  27. private final ReentrantLock takeLock = new ReentrantLock(); 
  28.  
  29.   
  30.  
  31. // 拿锁的条件对象 
  32.  
  33. private final Condition notEmpty = takeLock.newCondition(); 
  34.  
  35.   
  36.  
  37. // 放锁 
  38.  
  39. private final ReentrantLock putLock = new ReentrantLock(); 
  40.  
  41.   
  42.  
  43. // 放锁的条件对象 
  44.  
  45. private final Condition notFull = putLock.newCondition();  

ArrayBlockingQueue只有1个锁,添加数据和删除数据的时候只能有1个被执行,不允许并行执行。

而LinkedBlockingQueue有2个锁,放锁和拿锁,添加数据和删除数据是可以并行进行的,当然添加数据和删除数据的时候只能有1个线程各自执行。

数据的添加

LinkedBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

add方法内部调用offer方法:

 
  1. public boolean offer(E e) { 
  2.  
  3.     if (e == null) throw new NullPointerException(); // 不允许空元素 
  4.  
  5.     final AtomicInteger count = this.count
  6.  
  7.     if (count.get() == capacity) // 如果容量满了,返回false 
  8.  
  9.         return false
  10.  
  11.     int c = -1; 
  12.  
  13.     Node<E> node = new Node(e); // 容量没满,以新元素构造节点 
  14.  
  15.     final ReentrantLock putLock = this.putLock; 
  16.  
  17.     putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程 
  18.  
  19.     try { 
  20.  
  21.         if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行 
  22.  
  23.             enqueue(node); // 节点添加到链表尾部 
  24.  
  25.             c = count.getAndIncrement(); // 元素个数+1 
  26.  
  27.             if (c + 1 < capacity) // 如果容量还没满 
  28.  
  29.                 notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 
  30.  
  31.         } 
  32.  
  33.     } finally { 
  34.  
  35.         putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法 
  36.  
  37.     } 
  38.  
  39.     if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 
  40.  
  41.         signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 
  42.  
  43.     return c >= 0; // 添加成功返回true,否则返回false 
  44.  
  45. }  

put方法:

 
  1. public void put(E e) throws InterruptedException { 
  2.  
  3.     if (e == null) throw new NullPointerException(); // 不允许空元素 
  4.  
  5.     int c = -1; 
  6.  
  7.     Node<E> node = new Node(e); // 以新元素构造节点 
  8.  
  9.     final ReentrantLock putLock = this.putLock; 
  10.  
  11.     final AtomicInteger count = this.count
  12.  
  13.     putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程 
  14.  
  15.     try { 
  16.  
  17.         while (count.get() == capacity) { // 如果容量满了 
  18.  
  19.             notFull.await(); // 阻塞并挂起当前线程 
  20.  
  21.         } 
  22.  
  23.         enqueue(node); // 节点添加到链表尾部 
  24.  
  25.         c = count.getAndIncrement(); // 元素个数+1 
  26.  
  27.         if (c + 1 < capacity) // 如果容量还没满 
  28.  
  29.             notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 
  30.  
  31.     } finally { 
  32.  
  33.         putLock.unlock(); // 释放放锁,让其他线程可以调用put方法 
  34.  
  35.     } 
  36.  
  37.     if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 
  38.  
  39.         signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 
  40.  
  41. }  

LinkedBlockingQueue的添加数据方法add,put,offer跟ArrayBlockingQueue一样,不同的是它们的底层实现不一样。

ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒。

而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程。

数据的删除

LinkedBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

poll方法:

 
  1. public E poll() { 
  2.  
  3.     final AtomicInteger count = this.count
  4.  
  5.     if (count.get() == 0) // 如果元素个数为0 
  6.  
  7.         return null; // 返回null 
  8.  
  9.     E x = null
  10.  
  11.     int c = -1; 
  12.  
  13.     final ReentrantLock takeLock = this.takeLock; 
  14.  
  15.     takeLock.lock(); // 拿锁加锁,保证调用poll方法的时候只有1个线程 
  16.  
  17.     try { 
  18.  
  19.         if (count.get() > 0) { // 判断队列里是否还有数据 
  20.  
  21.             x = dequeue(); // 删除头结点 
  22.  
  23.             c = count.getAndDecrement(); // 元素个数-1 
  24.  
  25.             if (c > 1) // 如果队列里还有元素 
  26.  
  27.                 notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费 
  28.  
  29.         } 
  30.  
  31.     } finally { 
  32.  
  33.         takeLock.unlock(); // 释放拿锁,让其他线程可以调用poll方法 
  34.  
  35.     } 
  36.  
  37.     if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据 
  38.  
  39.         signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据 
  40.  
  41.                 return x; 
  42.  

take方法:

 
  1. public E take() throws InterruptedException { 
  2.  
  3.     E x; 
  4.  
  5.     int c = -1; 
  6.  
  7.     final AtomicInteger count = this.count
  8.  
  9.     final ReentrantLock takeLock = this.takeLock; 
  10.  
  11.     takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程 
  12.  
  13.     try { 
  14.  
  15.         while (count.get() == 0) { // 如果队列里已经没有元素了 
  16.  
  17.             notEmpty.await(); // 阻塞并挂起当前线程 
  18.  
  19.         } 
  20.  
  21.         x = dequeue(); // 删除头结点 
  22.  
  23.         c = count.getAndDecrement(); // 元素个数-1 
  24.  
  25.         if (c > 1) // 如果队列里还有元素 
  26.  
  27.             notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费 
  28.  
  29.     } finally { 
  30.  
  31.         takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法 
  32.  
  33.     } 
  34.  
  35.     if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据 
  36.  
  37.         signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据 
  38.  
  39.     return x; 
  40.  

remove方法:

 
  1. public boolean remove(Object o) { 
  2.  
  3.     if (o == nullreturn false
  4.  
  5.     fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁 
  6.  
  7.     try { 
  8.  
  9.         for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历 
  10.  
  11.              p != null
  12.  
  13.              trail = p, p = p.next) { 
  14.  
  15.             if (o.equals(p.item)) { // 判断是否找到对象 
  16.  
  17.                 unlink(p, trail); // 修改节点的链接信息,同时调用notFull的signal方法 
  18.  
  19.                 return true
  20.  
  21.             } 
  22.  
  23.         } 
  24.  
  25.         return false
  26.  
  27.     } finally { 
  28.  
  29.         fullyUnlock(); // 2个锁解锁 
  30.  
  31.     } 
  32.  

LinkedBlockingQueue的take方法对于没数据的情况下会阻塞,poll方法删除链表头结点,remove方法删除指定的对象。

需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。


作者:佚名

来源:51CTO

相关文章
|
1月前
|
算法 Java
java面向对象和面向过程分析
java面向对象和面向过程分析
36 0
|
1月前
|
存储 Java 编译器
java和c++的主要区别、各自的优缺点分析、java跨平台的原理的深度解析
java和c++的主要区别、各自的优缺点分析、java跨平台的原理的深度解析
65 0
|
1月前
|
安全 Java
Java并发编程:Synchronized及其实现原理
Java并发编程:Synchronized及其实现原理
23 4
|
6天前
|
Java 调度
Java中常见锁的分类及概念分析
Java中常见锁的分类及概念分析
13 0
|
6天前
|
Java
Java中ReentrantLock中tryLock()方法加锁分析
Java中ReentrantLock中tryLock()方法加锁分析
8 0
|
22天前
|
人工智能 监控 算法
java智慧城管源码 AI视频智能分析 可直接上项目
Java智慧城管源码实现AI视频智能分析,适用于直接部署项目。系统运用互联网、大数据、云计算和AI提升城市管理水平,采用“一级监督、二级指挥、四级联动”模式。功能涵盖AI智能检测(如占道广告、垃圾处理等)、执法办案、视频分析、统计分析及队伍管理等多个模块,利用深度学习优化城市管理自动化和智能化,提供决策支持。
137 4
java智慧城管源码 AI视频智能分析 可直接上项目
|
6天前
|
Java
Java中关于ConditionObject的分析
Java中关于ConditionObject的分析
15 3
|
9天前
|
设计模式 缓存 安全
分析设计模式对Java应用性能的影响,并提供优化策略
【4月更文挑战第7天】本文分析了7种常见设计模式对Java应用性能的影响及优化策略:单例模式可采用双重检查锁定、枚举实现或对象池优化;工厂方法和抽象工厂模式可通过对象池和缓存减少对象创建开销;建造者模式应减少构建步骤,简化复杂对象;原型模式优化克隆方法或使用序列化提高复制效率;适配器模式尽量减少使用,或合并多个适配器;观察者模式限制观察者数量并使用异步通知。设计模式需根据应用场景谨慎选用,兼顾代码质量和性能。
|
29天前
|
缓存 Java
java使用MAT进行内存分析
java使用MAT进行内存分析
|
2月前
|
前端开发 JavaScript Java
利用Java Web技术实现实时通信系统的案例分析
利用Java Web技术实现实时通信系统的案例分析