java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析

简介:

BlockingQueue是阻塞队列接口类,该接口继承了Queue接口

BlockingQueue实现类常见的有以下几种。

  1. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。


  2. DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。


  3. LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限


  4. PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现


  5. SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。


BlockingQueue接口提供了

3个添加元素方法

  1. add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常

  2. offer:添加元素到队列里,添加成功返回true,添加失败返回false

  3. put:添加元素到队列里,如果容量满了会阻塞直到容量不满

3个删除方法。

  1. poll:删除队列头部元素,如果队列为空,返回null。否则返回元素。

  2. remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false

  3. take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除



例子:生产者和消费者非常适合阻塞队列,其实我也弄过redis作为生产者和消费者模式,redis的list非常适合做队列,生产者放入队列和消费者从队列里取出,同时也提供阻塞的取出等。先回归到java的阻塞队列里,用LinkedBlockingQueue来做这个例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package  com.basic.test;
 
import  java.util.Random;
import  java.util.concurrent.BlockingQueue;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.LinkedBlockingQueue;
 
/**
  * Created by sdc on 2017/6/9.
  */
public  class  BlockingQueueTest {
 
     public  static  void  main(String[] args) {
         BlockingQueue<Integer> blockingQueue =  new  LinkedBlockingQueue<Integer>( 10 );
         Producter producer =  new  Producter(blockingQueue);
         Consumer consumer =  new  Consumer(blockingQueue);
 
         //创建5个生产者,5个消费者
         for  ( int  i =  0 ; i <  10 ; i++) {
             if  (i <  5 ) {
                 new  Thread(producer,  "producer"  + i).start();
             else  {
                 new  Thread(consumer,  "consumer"  + (i -  5 )).start();
             }
         }
 
         try  {
             Thread.sleep( 1000 );
         catch  (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
         producer.shutDown();
         consumer.shutDown();
     }
 
 
     static  class  Producter  implements  Runnable {
 
         private  final  BlockingQueue<Integer> blockingQueue;
 
         private  volatile  boolean  flag;
 
         private  Random random;
 
         public  Producter(BlockingQueue<Integer> blockingQueue) {
             this .blockingQueue = blockingQueue;
             flag =  false ;
             random =  new  Random();
 
         }
 
         @Override
         public  void  run() {
             while  (!flag) {
                 int  info = random.nextInt( 100 );
                 try  {
                     blockingQueue.put(info);
                     System.out.println(Thread.currentThread().getName() +  "produce"  + info);
                     Thread.sleep( 50 );
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
         }
 
         public  void  shutDown() {
             flag =  true ;
         }
     }
 
     static  class  Consumer  implements  Runnable {
 
         private  final  BlockingQueue<Integer> blockingQueue;
 
         private  volatile  boolean  flag;
 
         public  Consumer(BlockingQueue<Integer> blockingQueue) {
             this .blockingQueue = blockingQueue;
         }
 
         @Override
         public  void  run() {
             while  (!flag) {
                 int  info;
                 try  {
                     info = blockingQueue.take();
                     System.out.println(Thread.currentThread().getName() +  " consumer "  + info);
                     Thread.sleep( 50 );
                 catch  (InterruptedException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
             }
         }
 
         public  void  shutDown() {
             flag =  true ;
         }
     }
 
}

LinkedBlockingQueue

LinkedBlockingQueue队列是一个使用链表完成的阻塞队列,链表是单向的。

内部用了两个锁,takeLock,putLock,添加数据和删除数据都是并行执行的,当然添加数据和删除数据的时候只能有1个线程各自执行。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 容量大小
private  final  int  capacity;
 
// 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger
private  final  AtomicInteger count =  new  AtomicInteger( 0 );
 
// 头结点
private  transient  Node<E> head;
 
// 尾节点
private  transient  Node<E> last;
 
// 拿锁
private  final  ReentrantLock takeLock =  new  ReentrantLock();
 
// 拿锁的条件对象
private  final  Condition notEmpty = takeLock.newCondition();
 
// 放锁
private  final  ReentrantLock putLock =  new  ReentrantLock();
 
// 放锁的条件对象
private  final  Condition notFull = putLock.newCondition();


LinkedBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
offer与put都是添加元素到queue的尾部, 只不过 put 方法在队列满时会进行阻塞, 直到成功; 
而 offer 操作在容量满时直接返回  false .
 
public  boolean  offer(E e) {
     if  (e ==  null throw  new  NullPointerException();  // 不允许空元素
     final  AtomicInteger count =  this .count;
     if  (count.get() == capacity)  // 如果容量满了,返回false
         return  false ;
     int  c = - 1 ;
     Node<E> node =  new  Node(e);  // 容量没满,以新元素构造节点
     final  ReentrantLock putLock =  this .putLock;
     putLock.lock();  // 放锁加锁,保证调用offer方法的时候只有1个线程
     try  {
         if  (count.get() < capacity) {  // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行
             enqueue(node);  // 节点添加到链表尾部
             c = count.getAndIncrement();  // 元素个数+1
             if  (c +  1  < capacity)  // 如果容量还没满
                 notFull.signal();  // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
         }
     finally  {
         putLock.unlock();  // 释放放锁,让其他线程可以调用offer方法
     }
     if  (c ==  0 // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
         signalNotEmpty();  // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
     return  c >=  0 // 添加成功返回true,否则返回false
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
put元素是将元素添加到队列尾部,queue满时进行await,添加成功后容量还未满,则进行signal.
代码的注释中基本把操作思想都说了, 有几个注意的地方
当queue满时, 会调用 notFull.await() 进行等待, 而相应的唤醒的地方有两处, 一个是 "有线程进行
put/offer 成功后且 (c +  1 ) < capacity 时 ", 另一处是 " 在线程进行 take/poll 成功 且 
(c == capacity) (PS: 这里的 c 指的是 在进行 take/poll 之前的容量) "代码中的 " signalNotEmpty" 这时在原来queue的数量 c (getAndIncrement的返回值是原来的值) 
== 0  时对此时在调用 take/poll 方法的线程进行唤醒。
 
public  void  put(E e)  throws  InterruptedException {
     if  (e ==  null throw  new  NullPointerException();  // 不允许空元素
     int  c = - 1 ;
     Node<E> node =  new  Node(e);  // 以新元素构造节点
     final  ReentrantLock putLock =  this .putLock;
     final  AtomicInteger count =  this .count;
     putLock.lockInterruptibly();  // 放锁加锁,保证调用put方法的时候只有1个线程
     try  {
         while  (count.get() == capacity) {  // 如果容量满了
             notFull.await();  // 阻塞并挂起当前线程
         }
         enqueue(node);  // 节点添加到链表尾部
         c = count.getAndIncrement();  // 元素个数+1
         if  (c +  1  < capacity)  // 如果容量还没满
             notFull.signal();  // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
     finally  {
         putLock.unlock();  // 释放放锁,让其他线程可以调用put方法
     }
     if  (c ==  0 // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
         signalNotEmpty();  // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public  E poll() {
     final  AtomicInteger count =  this .count;
     if  (count.get() ==  0 // 如果元素个数为0
         return  null // 返回null
     E x =  null ;
     int  c = - 1 ;
     final  ReentrantLock takeLock =  this .takeLock;
     takeLock.lock();  // 拿锁加锁,保证调用poll方法的时候只有1个线程
     try  {
         if  (count.get() >  0 ) {  // 判断队列里是否还有数据
             x = dequeue();  // 删除头结点
             c = count.getAndDecrement();  // 元素个数-1
             if  (c >  1 // 如果队列里还有元素
                 notEmpty.signal();  // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
         }
     finally  {
         takeLock.unlock();  // 释放拿锁,让其他线程可以调用poll方法
     }
     if  (c == capacity)  // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据
         signalNotFull();  // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
                 return  x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public  E take()  throws  InterruptedException {
     E x;
     int  c = - 1 ;
     final  AtomicInteger count =  this .count;
     final  ReentrantLock takeLock =  this .takeLock;
     takeLock.lockInterruptibly();  // 拿锁加锁,保证调用take方法的时候只有1个线程
     try  {
         while  (count.get() ==  0 ) {  // 如果队列里已经没有元素了
             notEmpty.await();  // 阻塞并挂起当前线程
         }
         x = dequeue();  // 删除头结点
         c = count.getAndDecrement();  // 元素个数-1
         if  (c >  1 // 如果队列里还有元素
             notEmpty.signal();  // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
     finally  {
         takeLock.unlock();  // 释放拿锁,让其他线程可以调用take方法
     }
     if  (c == capacity)  // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据
         signalNotFull();  // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
     return  x;
}
 
poll 与 take 都是获取头节点的元素, 唯一的区别是 take在queue为空时进行await, poll
则直接返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  boolean  remove(Object o) {
     if  (o ==  null return  false ;
     fullyLock();  // remove操作要移动的位置不固定,2个锁都需要加锁
     try  {
         for  (Node<E> trail = head, p = trail.next;  // 从链表头结点开始遍历
              p !=  null ;
              trail = p, p = p.next) {
             if  (o.equals(p.item)) {  // 判断是否找到对象
                 unlink(p, trail);  // 修改节点的链接信息,同时调用notFull的signal方法
                 return  true ;
             }
         }
         return  false ;
     finally  {
         fullyUnlock();  // 2个锁解锁
     }
}


ArrayBlockingQueue


ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。


ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。


它带有的属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 存储队列元素的数组,是个循环数组
final  Object[] items;
 
// 拿数据的索引,用于take,poll,peek,remove方法
int  takeIndex;
 
// 放数据的索引,用于put,offer,add方法
int  putIndex;
 
// 元素个数
int  count;
 
// 可重入锁
final  ReentrantLock lock;
// notEmpty条件对象,由lock创建
private  final  Condition notEmpty;
// notFull条件对象,由lock创建
private  final  Condition notFull;
数据的添加


ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。


add方法:

1
2
3
4
5
6
public  boolean  add(E e) {
     if  (offer(e))
         return  true ;
     else
         throw  new  IllegalStateException( "Queue full" );
}

add方法内部调用offer方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  boolean  offer(E e) {
     checkNotNull(e);  // 不允许元素为空
     final  ReentrantLock lock =  this .lock;
     lock.lock();  // 加锁,保证调用offer方法的时候只有1个线程
     try  {
         if  (count == items.length)  // 如果队列已满
             return  false // 直接返回false,添加失败
         else  {
             insert(e);  // 数组没满的话调用insert方法
             return  true // 返回true,添加成功
         }
     finally  {
         lock.unlock();  // 释放锁,让其他线程可以调用offer方法
     }
}


insert方法如下:

1
2
3
4
5
6
private  void  insert(E x) {
     items[putIndex] = x;  // 元素添加到数组里
     putIndex = inc(putIndex);  // 放数据索引+1,当索引满了变成0
     ++count;  // 元素个数+1
     notEmpty.signal();  // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
}

put方法:

1
2
3
4
5
6
7
8
9
10
11
12
public  void  put(E e)  throws  InterruptedException {
     checkNotNull(e);  // 不允许元素为空
     final  ReentrantLock lock =  this .lock;
     lock.lockInterruptibly();  // 加锁,保证调用put方法的时候只有1个线程
     try  {
         while  (count == items.length)  // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里
             notFull.await();  // 线程阻塞并被挂起,同时释放锁
         insert(e);  // 调用insert方法
     finally  {
         lock.unlock();  // 释放锁,让其他线程可以调用put方法
     }
}


ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:


add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true


offer方法如果队列满了,返回false,否则返回true


add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。


这3个方法内部都会使用可重入锁保证原子性。


数据的删除


ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。


poll方法:

1
2
3
4
5
6
7
8
9
public  E poll() {
     final  ReentrantLock lock =  this .lock;
     lock.lock();  // 加锁,保证调用poll方法的时候只有1个线程
     try  {
         return  (count ==  0 ) ?  null  : extract();  // 如果队列里没元素了,返回null,否则调用extract方法
     finally  {
         lock.unlock();  // 释放锁,让其他线程可以调用poll方法
     }
}

poll方法内部调用extract方法:

1
2
3
4
5
6
7
8
9
private  E extract() {
     final  Object[] items =  this .items;
     E x =  this .<E>cast(items[takeIndex]);  // 得到取索引位置上的元素
     items[takeIndex] =  null // 对应取索引上的数据清空
     takeIndex = inc(takeIndex);  // 取数据索引+1,当索引满了变成0
     --count;  // 元素个数-1
     notFull.signal();  // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
     return  x;  // 返回元素
}

take方法:

1
2
3
4
5
6
7
8
9
10
11
public  E take()  throws  InterruptedException {
     final  ReentrantLock lock =  this .lock;
     lock.lockInterruptibly();  // 加锁,保证调用take方法的时候只有1个线程
     try  {
         while  (count ==  0 // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
             notEmpty.await();  // 线程阻塞并被挂起,同时释放锁
         return  extract();  // 调用extract方法
     finally  {
         lock.unlock();  // 释放锁,让其他线程可以调用take方法
     }
}

remove方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  boolean  remove(Object o) {
     if  (o ==  null return  false ;
     final  Object[] items =  this .items;
     final  ReentrantLock lock =  this .lock;
     lock.lock();  // 加锁,保证调用remove方法的时候只有1个线程
     try  {
         for  ( int  i = takeIndex, k = count; k >  0 ; i = inc(i), k--) {  // 遍历元素
             if  (o.equals(items[i])) {  // 两个对象相等的话
                 removeAt(i);  // 调用removeAt方法
                 return  true // 删除成功,返回true
             }
         }
         return  false // 删除成功,返回false
     finally  {
         lock.unlock();  // 释放锁,让其他线程可以调用remove方法
     }
}

removeAt方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void  removeAt( int  i) {
     final  Object[] items =  this .items;
     if  (i == takeIndex) {  // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可
         items[takeIndex] =  null ;
         takeIndex = inc(takeIndex);
     else  // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值
         for  (;;) {
             int  nexti = inc(i);
             if  (nexti != putIndex) {
                 items[i] = items[nexti];
                 i = nexti;
             else  {
                 items[i] =  null ;
                 putIndex = i;
                 break ;
             }
         }
     }
    --count;  // 元素个数-1
    notFull.signal();  // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满
    ,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知 
}

  

ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:


poll方法对于队列为空的情况,返回null,否则返回队列头部元素。


remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。


poll方法和remove方法不会阻塞线程。


take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。


这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。



阻塞队列常用的着两个队列都即使这样的,所以就这么个事情。


参考博文:

http://fangjian0423.github.io/2016/05/10/java-arrayblockingqueue-linkedblockingqueue-analysis/


以后会陆续补上其他的方法。



本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1934068


相关文章
|
4月前
|
缓存 JavaScript Java
常见java OOM异常分析排查思路分析
Java虚拟机(JVM)遇到内存不足时会抛出OutOfMemoryError(OOM)异常。常见OOM情况包括:1) **Java堆空间不足**:大量对象未被及时回收或内存泄漏;2) **线程栈空间不足**:递归过深或大量线程创建;3) **方法区溢出**:类信息过多,如CGLib代理类生成过多;4) **本机内存不足**:JNI调用消耗大量内存;5) **GC造成的内存不足**:频繁GC但效果不佳。解决方法包括调整JVM参数(如-Xmx、-Xss)、优化代码及使用高效垃圾回收器。
214 15
常见java OOM异常分析排查思路分析
|
5天前
|
存储 Java 开发者
【潜意识Java】深入详细理解分析Java中的toString()方法重写完整笔记总结,超级详细。
本文详细介绍了 Java 中 `toString()` 方法的重写技巧及其重要
31 10
【潜意识Java】深入详细理解分析Java中的toString()方法重写完整笔记总结,超级详细。
|
3月前
|
存储 Java
【编程基础知识】 分析学生成绩:用Java二维数组存储与输出
本文介绍如何使用Java二维数组存储和处理多个学生的各科成绩,包括成绩的输入、存储及格式化输出,适合初学者实践Java基础知识。
106 1
|
5天前
|
Java 应用服务中间件 API
【潜意识Java】javaee中的SpringBoot在Java 开发中的应用与详细分析
本文介绍了 Spring Boot 的核心概念和使用场景,并通过一个实战项目演示了如何构建一个简单的 RESTful API。
23 5
|
5天前
|
人工智能 自然语言处理 搜索推荐
【潜意识Java】了解并详细分析Java与AIGC的结合应用和使用方式
本文介绍了如何将Java与AIGC(人工智能生成内容)技术结合,实现智能文本生成。
28 5
|
5天前
|
SQL Java 数据库连接
【潜意识Java】Java中JDBC过时方法的替代方案以及JDBC为什么过时详细分析
本文介绍了JDBC中一些常见过时方法及其替代方案。
25 5
|
5天前
|
Java 数据库连接 数据库
【潜意识Java】深度分析黑马项目《苍穹外卖》在Java学习中的重要性
《苍穹外卖》项目对Java学习至关重要。它涵盖了用户管理、商品查询、订单处理等模块,涉及Spring Boot、MyBatis、Redis等技术栈。
30 4
|
5天前
|
Java 数据库连接 数据库
【潜意识Java】使用 Ruoyi 框架开发企业级应用,从零开始的实践指南和分析问题
本文介绍了基于Spring Boot的开源企业级框架Ruoyi,涵盖环境搭建、项目初始化及用户管理模块的创建。
47 4
|
5天前
|
SQL Java API
|
5天前
|
SQL Java 数据库连接
【潜意识Java】深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
18 1