Java并发阻塞队列之ArrayBlockingQueue

简介: Java并发阻塞队列之ArrayBlockingQueue

JUC简介

在 Java 5.0 提供了java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步IO和轻量级任务框架;还提供了设计用于多线程上下文中的Collection实现等;

今天要讲的ArrayBlockingQueue便是JUC包下的一个工具类。

ArrayBlockingQueue简介

ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。线程安全是指类内部通过“互斥锁”保护竞争资源,实现多线程对竞争资源的互斥访问。

“有界”则是指ArrayBlockingQueue对应的数组是有界限且固定的,在创建对象时由构造函数指定,一旦指定则无法更改。

阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待;

ArrayBlockingQueue是按FIFO(先进先出)原则对元素进行排序,元素都是从尾部插入到队列,从头部开始返回。

ArrayBlockingQueue函数列表

  1. // 创建一个带有给定的(固定)容量和默认访问策略的ArrayBlockingQueue。
  2. ArrayBlockingQueue(int capacity)
  3. // 创建一个具有给定的(固定)容量和指定访问策略的ArrayBlockingQueue。
  4. ArrayBlockingQueue(int capacity,boolean fair)
  5. // 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定collection的元素,并以collection迭代器的遍历顺序添加元素。
  6. ArrayBlockingQueue(int capacity,boolean fair,Collection<?extends E> c)

  7. // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回true,如果此队列已满,则抛出IllegalStateException。
  8. boolean add(E e)
  9. // 自动移除此队列中的所有元素。
  10. void clear()
  11. // 如果此队列包含指定的元素,则返回true。
  12. boolean contains(Object o)
  13. // 移除此队列中所有可用的元素,并将它们添加到给定collection中。
  14. int drainTo(Collection<?super E> c)
  15. // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection中。
  16. int drainTo(Collection<?super E> c,int maxElements)
  17. // 返回在此队列中的元素上按适当顺序进行迭代的迭代器。
  18. Iterator<E> iterator()
  19. // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回false。
  20. boolean offer(E e)
  21. // 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
  22. boolean offer(E e,long timeout,TimeUnit unit)
  23. // 获取但不移除此队列的头;如果此队列为空,则返回null。
  24. E peek()
  25. // 获取并移除此队列的头,如果此队列为空,则返回null。
  26. E poll()
  27. // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
  28. E poll(long timeout,TimeUnit unit)
  29. // 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
  30. void put(E e)
  31. // 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。
  32. int remainingCapacity()
  33. // 从此队列中移除指定元素的单个实例(如果存在)。
  34. boolean remove(Object o)
  35. // 返回此队列中元素的数量。
  36. int size()
  37. // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
  38. E take()
  39. // 返回一个按适当顺序包含此队列中所有元素的数组。
  40. Object[] toArray()
  41. // 返回一个按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
  42. <T> T[] toArray(T[] a)
  43. // 返回此 collection 的字符串表示形式。
  44. String toString()

源代码分析

构造函数

ArrayBlockingQueue提供了三个构造函数。

  1.    publicArrayBlockingQueue(int capacity){
  2.        this(capacity,false);
  3.    }

  4.    publicArrayBlockingQueue(int capacity,boolean fair){
  5.        if(capacity <=0)
  6.            thrownewIllegalArgumentException();
  7.        this.items =newObject[capacity];
  8.        lock=newReentrantLock(fair);
  9.        notEmpty =lock.newCondition();
  10.        notFull =  lock.newCondition();
  11.    }

  12.    publicArrayBlockingQueue(int capacity,boolean fair,
  13.                              Collection<?extends E> c){
  14.        this(capacity, fair);

  15.        finalReentrantLocklock=this.lock;
  16.        lock.lock();// Lock only for visibility, not mutual exclusion
  17.        try{
  18.            int i =0;
  19.            try{
  20.                for(E e : c){
  21.                    checkNotNull(e);
  22.                    items[i++]= e;
  23.                }
  24.            }catch(ArrayIndexOutOfBoundsException ex){
  25.                thrownewIllegalArgumentException();
  26.            }
  27.            count = i;
  28.            putIndex =(i == capacity)?0: i;
  29.        }finally{
  30.            lock.unlock();
  31.        }
  32.    }

其中,第一个构造函数只需指定队列(数组)初始化大小,这正是前面提到的“有界”的边界所在。同时,它调用了第二个构造函数,默认将fair参数传值为false。

fair是“可重入的独占锁(ReentrantLock)”的类型。fair为true,表示是公平锁;fair为false,表示是非公平锁。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。

构造函数中this.items对应的代码为:

  1. /** The queued items */
  2.    finalObject[] items;

这是存储阻塞队列数据的数组。在第三个构造函数中提供了初始化队列数组中数据的方法。

加入队列

ArrayBlockingQueue提供了4个方法将元素添加入队列。


  • add(E e) :如果立即可行且不会超过该队列的容量,将指定的元素插入到队列的尾部。成功返回true,队列已满则抛出IllegalStateException("Queue full")异常。


  • offer(E e) :如果立即可行且不会超过该队列的容量,将指定的元素插入到此队列的尾部。成功返回true,队列已满则返回false。


  • offer(E e, long timeout, TimeUnit unit) :将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间


  • put(E e) :将指定的元素插入此队列的尾部,如果队列已满则等待可用的空间。成功则返回true,等待超时则返回false。

源代码说明:

  1. // 方法一
  2. publicboolean add(E e){
  3.    returnsuper.add(e);
  4. }
  5. // 方法二
  6. publicboolean add(E e){
  7.    if(offer(e))
  8.        returntrue;
  9.    else
  10.        thrownewIllegalStateException("Queue full");
  11. }
  12. // 方法三
  13. publicboolean offer(E e){
  14.    checkNotNull(e);
  15.    finalReentrantLocklock=this.lock;
  16.    lock.lock();  // 一直等到获取锁
  17.    try{
  18.        if(count == items.length)  //假如当前容纳的元素个数已经等于数组长度,那么返回false
  19.            returnfalse;
  20.        else{
  21.            enqueue(e);        // 将元素插入到队列中,返回true
  22.            returntrue;
  23.        }
  24.    }finally{
  25.        lock.unlock();        //释放锁
  26.    }
  27. }

add方法调用了父类的add方法(方法二),通过父类的add方法可以得出,最终还是调用了offer方法(方法三)。可以看出,父类方法调用offer之后,如果offer返回false,则表示队列已满,父类方法会抛出异常。

而offer方法首先校验添加的对象是否为null,如果null则直接抛出空指针异常。然后获得锁进行队列大小(count记录了队列中元素的个数)比较,如果当前队列中的元素个数与count相等,则返回false,不进行插入。否则,将元素插入队列,并返回true。

下面再看一下offer中调用的enqueue方法:

  1. privatevoid enqueue(E x){    

  2.    //调用enqueue的方法都都已经进行过同步处理
  3.    // assert lock.getHoldCount() == 1;
  4.    // assert items[putIndex] == null;
  5.    finalObject[] items =this.items;
  6.    items[putIndex]= x;        //putIndex是下一个被添加元素的坐标
  7.    if(++putIndex == items.length)    
  8.    //putIndex+1, 并且与数组长度进行比较,相同则从数组开头
  9.        putIndex =0;            //插入元素,这就是循环数组的奥秘了
  10.    count++;                //当前元素总量+1
  11.    notEmpty.signal();            //给等到在数组非空的线程一个信号,唤醒他们。
  12. }

enqueue方法才是所有添加队列方法真正调用来操作添加的方法,其中putIndex是下一个被添加元素的坐标。整个方法的业务逻辑是这样的:首先将待添加的元素添加到putIndex所在的位置,并且对putIndex进行自增(指向下一个待添加的位置)。然后比较下一个待添加的位置是否和数组的长度相同,如果相同则将putIndex指向数组开头(进入此方法的前提条件是队列数组未满)。然后队列总量加1。

通过这段代码我们就可以真正了解到ArrayBlockingQueue是如何循环使用数组的。首先创建一个定长空数组,然后依次填满数组的0,1,2,……,items.length-1 位置。与此同时,队列中的0,1,2,……位置的元素也在不停的被消费掉。当数组的items.length-1也被填充了元素,次数队列依旧未满,那么新增的元素将放置在哪里?对了,就是像上面的代码一样,会从数组的0坐标重新依次开始添加新的元素。通过这种方式,ArrayBlockingQueue实现了在定长数组下FIFO的队列。

取出队列

ArrayBlockingQueue提供了以下方法支持取出队列:


  • poll() :获取并移除此队列的头,如果队列为空,则返回null。


  • poll(long timeout, TimeUnit unit) :获取并移除此队列的头部,在指定的等待时间前等待可用的元素,超时则返回null。


  • remove(Object o) :从此队列中移除指定元素的单个实例(如果存在多个则只移除第一个)。如果不存在要移除的元素则返回false。


  • take() :获取并移除此队列的头部,如果队列为空,则一直等待可用元素,也就是说必须要拿到一个元素,除非线程中断。


  • peek():获取队列中takeIndex(待获取元素索引)位置的元素,如果为null则返回空。

源代码:

  1. public E poll(){
  2.    finalReentrantLocklock=this.lock;
  3.    lock.lock();
  4.    try{
  5.        return(count ==0)?null: dequeue();
  6.    }finally{
  7.        lock.unlock();
  8.    }
  9. }

  10. public E take()throwsInterruptedException{
  11.    finalReentrantLocklock=this.lock;
  12.    lock.lockInterruptibly();
  13.    try{
  14.        while(count ==0)
  15.            notEmpty.await();
  16.        return dequeue();
  17.    }finally{
  18.        lock.unlock();
  19.    }
  20. }

  21. public E poll(long timeout,TimeUnit unit)throwsInterruptedException{
  22.    long nanos = unit.toNanos(timeout);
  23.    finalReentrantLocklock=this.lock;
  24.    lock.lockInterruptibly();
  25.    try{
  26.        while(count ==0){
  27.            if(nanos <=0)
  28.                returnnull;
  29.            nanos = notEmpty.awaitNanos(nanos);
  30.        }
  31.        return dequeue();
  32.    }finally{
  33.        lock.unlock();
  34.    }
  35. }

  36. public E peek(){
  37.    finalReentrantLocklock=this.lock;
  38.    lock.lock();
  39.    try{
  40.        return itemAt(takeIndex);// null when queue is empty
  41.    }finally{
  42.        lock.unlock();
  43.    }
  44. }

  45. publicboolean remove(Object o){
  46.    if(o ==null)returnfalse;
  47.    finalObject[] items =this.items;
  48.    finalReentrantLocklock=this.lock;
  49.    lock.lock();
  50.    try{
  51.        if(count >0){
  52.            finalint putIndex =this.putIndex;
  53.            int i = takeIndex;
  54.            do{
  55.                if(o.equals(items[i])){
  56.                    removeAt(i);
  57.                    returntrue;
  58.                }
  59.                if(++i == items.length)
  60.                    i =0;
  61.            }while(i != putIndex);
  62.        }
  63.        returnfalse;
  64.    }finally{
  65.        lock.unlock();
  66.    }
  67. }

下面重点分析一下取出队列原因调用的dequeue方法:

  1. /**
  2. * Extracts element at current take position, advances, and signals.
  3. * Call only when holding lock.
  4. */
  5. private E dequeue(){
  6.    // assert lock.getHoldCount() == 1;
  7.    // assert items[takeIndex] != null;
  8.    finalObject[] items =this.items;
  9.    // 取出指定元素
  10.    @SuppressWarnings("unchecked")
  11.    E x =(E) items[takeIndex];
  12.    // 并将取出元素索引内容置为null
  13.    items[takeIndex]=null;
  14.    // 将待取出索引+1,并与队列长度做比较,如果超出数组长度则从0重新开始
  15.    if(++takeIndex == items.length)
  16.        takeIndex =0;
  17.    count--;
  18.    // 如果迭代器不为null,则进行迭代处理
  19.    if(itrs !=null)
  20.        itrs.elementDequeued();
  21.    notFull.signal();
  22.    return x;
  23. }

在enqueue中解了添加元素进入队列的操作之后就不难理解从队列中取出数据的过程了。首先调用dequeue的取出操作,都会先将元素取出,然后再将数组对应位置置null。然后对takeIndex的位置进行后移1位,如果takeIndex处于数组的最后一位,则重新从0开始。

实战

在学习了基础理论知识之后,我们用一个实例来练习一下。

  1. package com.secbro2.juc;

  2. import java.util.Queue;
  3. import java.util.concurrent.ArrayBlockingQueue;

  4. /**
  5. * @author zzs
  6. */
  7. publicclassArrayBlockingQueueDemo{

  8.    privatestaticQueue<String> queue =newArrayBlockingQueue<String>(20);

  9.    publicstaticvoid main(String[] args){
  10.        newQueueThread("QTA").start();
  11.        newQueueThread("QTB").start();
  12.    }

  13.    privatestaticclassQueueThreadextendsThread{
  14.        QueueThread(String name){
  15.            super(name);
  16.        }

  17.        @Override
  18.        publicvoid run(){
  19.            for(int i =0; i <6; i++){
  20.                // 线程名称+序号
  21.                String str =Thread.currentThread().getName()+"-"+ i;
  22.                queue.add(str);
  23.                printQueue();
  24.            }

  25.        }
  26.    }

  27.    privatestaticvoid printQueue(){
  28.        StringBuilder sb =newStringBuilder();
  29.        for(Object aQueue : queue){
  30.            sb.append(aQueue).append(",");
  31.        }

  32.        System.out.println(sb.toString());
  33.    }
  34. }

打印结果如下:

  1. QTA-0,QTB-0,
  2. QTA-0,QTB-0,
  3. QTA-0,QTB-0,QTA-1,
  4. QTA-0,QTB-0,QTA-1,QTB-1,
  5. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,
  6. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,
  7. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,
  8. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,
  9. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,
  10. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,
  11. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,QTB-5,
  12. QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,QTB-5,QTA-5,
目录
相关文章
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
56 2
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
36 1
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
4月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
66 5
|
4月前
|
存储 算法 Java
Java 中的同步集合和并发集合
【8月更文挑战第22天】
53 5
|
4月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
63 2
|
4月前
|
Java 开发者
【编程高手必备】Java多线程编程实战揭秘:解锁高效并发的秘密武器!
【8月更文挑战第22天】Java多线程编程是提升软件性能的关键技术,可通过继承`Thread`类或实现`Runnable`接口创建线程。为确保数据一致性,可采用`synchronized`关键字或`ReentrantLock`进行线程同步。此外,利用`wait()`和`notify()`方法实现线程间通信。预防死锁策略包括避免嵌套锁定、固定锁顺序及设置获取锁的超时。掌握这些技巧能有效增强程序的并发处理能力。
29 2
|
4月前
|
存储 安全 Java
从基础到实战:如何用 Java 手写一个阻塞队列?
大家好,我是小米!今天分享手写阻塞队列(Blocking Queue)教程,深入讲解并发编程中的 wait() 和 notifyAll() 机制,通过代码实战,让你轻松掌握生产者-消费者模型中的阻塞队列实现!
119 0