Java并发基础:ArrayBlockingQueue全面解析!

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: ArrayBlockingQueue类是一个高效、线程安全的队列实现,它基于数组,提供了快速的元素访问,并支持多线程间的同步操作,作为有界队列,它能有效防止内存溢出,并通过阻塞机制平衡生产者和消费者的速度差异,它还提供了公平性和非公平性策略,满足不同场景下的需求。

Java并发基础:ArrayBlockingQueue全面解析! - 程序员古德

内容摘要

ArrayBlockingQueue类是一个高效、线程安全的队列实现,它基于数组,提供了快速的元素访问,并支持多线程间的同步操作,作为有界队列,它能有效防止内存溢出,并通过阻塞机制平衡生产者和消费者的速度差异,它还提供了公平性和非公平性策略,满足不同场景下的需求。

核心概念

主要场景

在现实业务场景中,可以将ArrayBlockingQueue地运用到许多需要处理并发和资源限制的问题上,假设,团队正在构建一个在线订餐系统,其中有一个核心模块负责处理订单请求并将订单分配给餐厅厨房进行制作。

比如,厨房的每个工作台都有一定的处理能力,比如同时只能处理5个订单,超过这个数量,工作台就会变得拥挤而无法再接单,为了模拟这种有限的处理能力,可以创建一个容量为5的ArrayBlockingQueue

每当用户通过前端提交了一个新的订单请求时,后端的订单处理器线程会尝试将这个订单对象作为一个任务放入ArrayBlockingQueue中,如果此时队列未满,订单会被成功放入并通知厨房开始处理;但如果队列已满,则表示当前厨房工作台负荷过大,订单处理器线程会进入等待状态,直到厨房完成了一个订单并将结果从队列中取出后,新订单才有机会被加入队列。

此刻,ArrayBlockingQueue就像是厨房与订单处理器之间的缓冲区和信号灯,它既能控制流入厨房的订单流,防止过载,又能确保订单处理器在没有订单可处理时不会空转浪费资源,从而保证整个系统的稳定性和效率。

主要功能

ArrayBlockingQueue主要用于解决以下功能问题:

  1. 多线程间的数据共享
    在多线程编程中,线程之间经常需要共享数据ArrayBlockingQueue作为一个线程安全的队列,允许不同线程安全地添加和移除元素,它内部的同步机制确保了在并发环境下数据的一致性和完整性。
  2. 生产者-消费者协作
    ArrayBlockingQueue是实现生产者-消费者模式的理想选择,在生产者-消费者模式中,生产者产生数据放入缓冲区,而消费者从缓冲区中取走数据,ArrayBlockingQueue的阻塞特性能够自动调节生产者和消费者的速度:当缓冲区满时,生产者会被阻塞直到有空间可用;当缓冲区空时,消费者会被阻塞直到有数据可取。
  3. 流量控制
    由于ArrayBlockingQueue是一个有界队列,它可以用来实现流量控制,通过设置队列的最大容量,可以限制系统中待处理的任务或数据的数量,这对于防止资源过载和维持系统的稳定性至关重要。
  4. 任务调度与负载均衡
    在并发系统中,ArrayBlockingQueue可以作为任务队列使用,用于存储待执行的任务,线程池中的工作线程可以从队列中取出任务进行处理,从而实现任务的调度和负载均衡。
  5. 解耦
    使用ArrayBlockingQueue可以将数据的生产和消费解耦,生产者不需要知道消费者的具体实现,只需要将数据放入队列;同样,消费者也不需要知道生产者的具体实现,只需要从队列中取出数据,这提高了系统的可维护性和可扩展性。
  6. 缓冲
    ArrayBlockingQueue作为一个缓冲区,可以平滑生产者和消费者之间的速度差异,当生产者速度较快时,队列可以存储多余的数据;当消费者速度较快时,队列可以提供足够的数据供其消费,这有助于减少系统的响应时间和提高吞吐量。

代码案例

下面是一个简单的Java程序,演示了如何使用ArrayBlockingQueue类实现一个生产者-消费者场景,其中生产者线程向队列中添加数据,而消费者线程从队列中移除数据,如下代码:

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  

// 生产者类,用于向队列中添加数据  
class Producer implements Runnable {
   
     
    private final BlockingQueue<Integer> queue;  
    private final int maxSize;  

    public Producer(BlockingQueue<Integer> queue, int maxSize) {
   
     
        this.queue = queue;  
        this.maxSize = maxSize;  
    }  

    @Override  
    public void run() {
   
     
        try {
   
     
            for (int i = 0; i < maxSize; i++) {
   
     
                // 模拟生产时间  
                Thread.sleep((long) (Math.random() * 1000));  
                queue.put(i); // 将数据放入队列,如果队列已满则阻塞  
                System.out.println("Produced: " + i);  
            }  
        } catch (InterruptedException e) {
   
     
            Thread.currentThread().interrupt();  
        }  
    }  
}  

// 消费者类,用于从队列中移除数据  
class Consumer implements Runnable {
   
     
    private final BlockingQueue<Integer> queue;  

    public Consumer(BlockingQueue<Integer> queue) {
   
     
        this.queue = queue;  
    }  

    @Override  
    public void run() {
   
     
        try {
   
     
            while (true) {
   
     
                Integer consumed = queue.take(); // 从队列中取出数据,如果队列为空则阻塞  
                System.out.println("Consumed: " + consumed);  
                // 假设消费完maxSize-1个元素后,消费者就不再消费了  
                if (consumed == queue.remainingCapacity()) {
   
     
                    break;  
                }  
                // 模拟消费时间  
                Thread.sleep((long) (Math.random() * 1000));  
            }  
        } catch (InterruptedException e) {
   
     
            Thread.currentThread().interrupt();  
        }  
    }  
}  

// 主类,包含main方法,用于启动生产者和消费者线程  
public class ArrayBlockingQueueDemo {
   
     
    public static void main(String[] args) {
   
     
        int queueSize = 5; // 队列大小  
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize); // 创建一个有界阻塞队列  

        // 启动生产者线程  
        Thread producerThread = new Thread(new Producer(queue, queueSize));  
        producerThread.start();  

        // 启动消费者线程  
        Thread consumerThread = new Thread(new Consumer(queue));  
        consumerThread.start();  
    }  
}

在这个例子中,创建了一个大小为5的ArrayBlockingQueue,生产者线程会生成从0到4的整数,并尝试将它们放入队列中,如果队列已满,生产者线程会阻塞,直到队列中有空间可用,消费者线程会不断尝试从队列中取出元素,如果队列为空,消费者线程会阻塞,直到队列中有元素可取,生产者和消费者线程都使用了Thread.sleep()方法来模拟生产和消费的时间延迟。

Produced: 0  
Produced: 1  
Consumed: 0  
Produced: 2  
Consumed: 1  
Produced: 3  
Consumed: 2  
Produced: 4  
Consumed: 3  
Consumed: 4

核心API

ArrayBlockingQueue实现了一个基于数组的有界阻塞队列,这个队列按照 FIFO(先进先出)的原则对元素进行排序,当尝试向已满的队列中放入元素时,操作将会被阻塞;当尝试从空队列中取出元素时,操作也会被阻塞,以下是ArrayBlockingQueue类中一些主要方法的含义:

1、核心构造方法

  1. ArrayBlockingQueue(int capacity): 创建一个具有给定容量的新的ArrayBlockingQueue实例。
  2. ArrayBlockingQueue(int capacity, boolean fair): 创建一个具有给定容量和公平性设置的新ArrayBlockingQueue实例,如果设置为公平,等待时间最长的线程将获得访问队列的优先权;如果设置为不公平,则访问顺序是不确定的。

2、添加元素

  1. add(E e): 将指定的元素插入此队列的尾部,如果队列已满,则抛出IllegalStateException
  2. offer(E e): 将指定的元素插入此队列的尾部,如果队列已满,则返回false
  3. put(E e) throws InterruptedException: 将指定的元素插入此队列的尾部,等待必要的空间变得可用,如果当前线程被中断,则抛出InterruptedException
  4. offer(E e, long timeout, TimeUnit unit): 将指定的元素插入此队列的尾部,等待指定的时间以使空间变得可用,如果在指定的时间内队列仍然满,则返回false

3、移除元素

  1. remove(): 移除并返回此队列的头部,如果队列为空,则抛出NoSuchElementException
  2. poll(): 移除并返回此队列的头部,或者如果队列为空,则返回null
  3. take() throws InterruptedException: 移除并返回此队列的头部,等待元素变得可用,如果当前线程被中断,则抛出InterruptedException
  4. poll(long timeout, TimeUnit unit): 移除并返回此队列的头部,等待指定的时间以使元素可用,如果在指定的时间内队列仍然为空,则返回null

4、检查元素

  1. element(): 获取但不移除此队列的头部,如果队列为空,则抛出NoSuchElementException
  2. peek(): 获取但不移除此队列的头部,或者如果队列为空,则返回null

5、其他方法

  1. size(): 返回队列中的元素数量。
  2. remainingCapacity(): 返回队列的理想最大容量与当前大小之间的差值。
  3. clear(): 移除此队列中的所有元素。
  4. contains(Object o): 如果此队列包含指定的元素,则返回true
  5. drainTo(Collection<? super E> c): 移除此队列中所有可用的元素,并将它们添加到给定的集合中。
  6. drainTo(Collection<? super E> c, int maxElements): 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定的集合中。
  7. toArray(): 返回以适当顺序包含此队列中所有元素的数组。
  8. iterator(): 返回在此队列的元素上进行迭代的迭代器。

核心总结

Java并发基础:ArrayBlockingQueue全面解析! - 程序员古德

ArrayBlockingQueue是一个非常实用的有界阻塞队列,其优点在于,基于数组实现,内存占用连续,查询速度快;同时支持多线程间的同步操作,能够很好地处理生产者-消费者问题。

另外,它还可以设置公平性,确保等待时间最长的线程优先获取资源。但是,由于是基于数组实现的,所以在初始化时需要指定队列大小,且之后无法改变,这在某些场景下可能不够灵活,当队列满或空时,相关操作会被阻塞,如果处理不当,可能会导致线程挂起或资源浪费。

在使用ArrayBlockingQueue时,要合理设置队列大小,避免过大或过小,同时,要注意处理阻塞情况,可以通过设置超时时间或使用offerpoll等非阻塞方法来避免线程长时间等待,此外,在多线程环境下使用时,要注意线程安全问题,确保数据的正确性和一致性。

关注我,每天学习互联网编程技术 - 程序员古德

END!

往期回顾

Java并发基础:LinkedTransferQueue全面解析!

Java并发基础:BlockingQueue和BlockingDeque接口的区别?

Java并发基础:Deque接口和Queue接口的区别?

Spring核心基础:全面总结Spring中提供的那些基础工具类!

Java并发基础:FutureTask全面解析!

相关文章
|
5天前
|
安全 Java 测试技术
🎉Java零基础:全面解析枚举的强大功能
【10月更文挑战第19天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
96 60
|
5天前
|
Java 程序员 开发者
Java中的异常处理机制深度解析####
本文将深入浅出地探讨Java编程语言中异常处理的核心概念与实践策略,旨在帮助开发者更好地理解如何构建健壮的应用程序。通过剖析异常体系结构、掌握有效的异常捕获与处理技巧,以及学习最佳实践,读者能够提升代码质量,减少运行时错误,从而增强软件的稳定性和用户体验。 ####
|
3天前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
13 2
|
4天前
|
存储 缓存 安全
🌟Java零基础:深入解析Java序列化机制
【10月更文挑战第20天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
14 3
|
3天前
|
算法 Java 数据库连接
Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性
本文详细介绍了Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性。连接池通过复用数据库连接,显著提升了应用的性能和稳定性。文章还展示了使用HikariCP连接池的示例代码,帮助读者更好地理解和应用这一技术。
12 1
|
Java
Java并发编程笔记之ArrayBlockingQueue源码分析
JDK 中基于数组的阻塞队列 ArrayBlockingQueue 原理剖析,ArrayBlockingQueue 内部如何基于一把独占锁以及对应的两个条件变量实现出入队操作的线程安全? 首先我们先大概的浏览一下ArrayBlockingQueue 的内部构造,如下类图: 如类图所示,可以看到ArrayBlockingQueue 内部有个数组items 用来存放队列元素,putIndex变量标示入队元素的下标,takeIndex是出队的下标,count是用来统计队列元素个数, 从定义可以知道,这些属性并没有使用valatile修饰,这是因为访问这些变量的使用都是在锁块内被用。
4366 0
|
7天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
77 38
|
4天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
8天前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
23 1
[Java]线程生命周期与线程通信
|
6天前
|
安全 Java
在 Java 中使用实现 Runnable 接口的方式创建线程
【10月更文挑战第22天】通过以上内容的介绍,相信你已经对在 Java 中如何使用实现 Runnable 接口的方式创建线程有了更深入的了解。在实际应用中,需要根据具体的需求和场景,合理选择线程创建方式,并注意线程安全、同步、通信等相关问题,以确保程序的正确性和稳定性。

推荐镜像

更多