Java并发基础:ArrayBlockingQueue全面解析!

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: ArrayBlockingQueue类是一个高效、线程安全的队列实现,它基于数组,提供了快速的元素访问,并支持多线程间的同步操作,作为有界队列,它能有效防止内存溢出,并通过阻塞机制平衡生产者和消费者的速度差异,它还提供了公平性和非公平性策略,满足不同场景下的需求。

Java并发基础:ArrayBlockingQueue全面解析! - 程序员古德

内容摘要

ArrayBlockingQueue类是一个高效、线程安全的队列实现,它基于数组,提供了快速的元素访问,并支持多线程间的同步操作,作为有界队列,它能有效防止内存溢出,并通过阻塞机制平衡生产者和消费者的速度差异,它还提供了公平性和非公平性策略,满足不同场景下的需求。

核心概念

主要场景

在现实业务场景中,可以将ArrayBlockingQueue地运用到许多需要处理并发和资源限制的问题上,假设,团队正在构建一个在线订餐系统,其中有一个核心模块负责处理订单请求并将订单分配给餐厅厨房进行制作。

比如,厨房的每个工作台都有一定的处理能力,比如同时只能处理5个订单,超过这个数量,工作台就会变得拥挤而无法再接单,为了模拟这种有限的处理能力,可以创建一个容量为5的ArrayBlockingQueue

每当用户通过前端提交了一个新的订单请求时,后端的订单处理器线程会尝试将这个订单对象作为一个任务放入ArrayBlockingQueue中,如果此时队列未满,订单会被成功放入并通知厨房开始处理;但如果队列已满,则表示当前厨房工作台负荷过大,订单处理器线程会进入等待状态,直到厨房完成了一个订单并将结果从队列中取出后,新订单才有机会被加入队列。

此刻,ArrayBlockingQueue就像是厨房与订单处理器之间的缓冲区和信号灯,它既能控制流入厨房的订单流,防止过载,又能确保订单处理器在没有订单可处理时不会空转浪费资源,从而保证整个系统的稳定性和效率。

主要功能

ArrayBlockingQueue主要用于解决以下功能问题:

  1. 多线程间的数据共享
    在多线程编程中,线程之间经常需要共享数据ArrayBlockingQueue作为一个线程安全的队列,允许不同线程安全地添加和移除元素,它内部的同步机制确保了在并发环境下数据的一致性和完整性。
  2. 生产者-消费者协作
    ArrayBlockingQueue是实现生产者-消费者模式的理想选择,在生产者-消费者模式中,生产者产生数据放入缓冲区,而消费者从缓冲区中取走数据,ArrayBlockingQueue的阻塞特性能够自动调节生产者和消费者的速度:当缓冲区满时,生产者会被阻塞直到有空间可用;当缓冲区空时,消费者会被阻塞直到有数据可取。
  3. 流量控制
    由于ArrayBlockingQueue是一个有界队列,它可以用来实现流量控制,通过设置队列的最大容量,可以限制系统中待处理的任务或数据的数量,这对于防止资源过载和维持系统的稳定性至关重要。
  4. 任务调度与负载均衡
    在并发系统中,ArrayBlockingQueue可以作为任务队列使用,用于存储待执行的任务,线程池中的工作线程可以从队列中取出任务进行处理,从而实现任务的调度和负载均衡。
  5. 解耦
    使用ArrayBlockingQueue可以将数据的生产和消费解耦,生产者不需要知道消费者的具体实现,只需要将数据放入队列;同样,消费者也不需要知道生产者的具体实现,只需要从队列中取出数据,这提高了系统的可维护性和可扩展性。
  6. 缓冲
    ArrayBlockingQueue作为一个缓冲区,可以平滑生产者和消费者之间的速度差异,当生产者速度较快时,队列可以存储多余的数据;当消费者速度较快时,队列可以提供足够的数据供其消费,这有助于减少系统的响应时间和提高吞吐量。

代码案例

下面是一个简单的Java程序,演示了如何使用ArrayBlockingQueue类实现一个生产者-消费者场景,其中生产者线程向队列中添加数据,而消费者线程从队列中移除数据,如下代码:

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  

// 生产者类,用于向队列中添加数据  
class Producer implements Runnable {
   
     
    private final BlockingQueue<Integer> queue;  
    private final int maxSize;  

    public Producer(BlockingQueue<Integer> queue, int maxSize) {
   
     
        this.queue = queue;  
        this.maxSize = maxSize;  
    }  

    @Override  
    public void run() {
   
     
        try {
   
     
            for (int i = 0; i < maxSize; i++) {
   
     
                // 模拟生产时间  
                Thread.sleep((long) (Math.random() * 1000));  
                queue.put(i); // 将数据放入队列,如果队列已满则阻塞  
                System.out.println("Produced: " + i);  
            }  
        } catch (InterruptedException e) {
   
     
            Thread.currentThread().interrupt();  
        }  
    }  
}  

// 消费者类,用于从队列中移除数据  
class Consumer implements Runnable {
   
     
    private final BlockingQueue<Integer> queue;  

    public Consumer(BlockingQueue<Integer> queue) {
   
     
        this.queue = queue;  
    }  

    @Override  
    public void run() {
   
     
        try {
   
     
            while (true) {
   
     
                Integer consumed = queue.take(); // 从队列中取出数据,如果队列为空则阻塞  
                System.out.println("Consumed: " + consumed);  
                // 假设消费完maxSize-1个元素后,消费者就不再消费了  
                if (consumed == queue.remainingCapacity()) {
   
     
                    break;  
                }  
                // 模拟消费时间  
                Thread.sleep((long) (Math.random() * 1000));  
            }  
        } catch (InterruptedException e) {
   
     
            Thread.currentThread().interrupt();  
        }  
    }  
}  

// 主类,包含main方法,用于启动生产者和消费者线程  
public class ArrayBlockingQueueDemo {
   
     
    public static void main(String[] args) {
   
     
        int queueSize = 5; // 队列大小  
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize); // 创建一个有界阻塞队列  

        // 启动生产者线程  
        Thread producerThread = new Thread(new Producer(queue, queueSize));  
        producerThread.start();  

        // 启动消费者线程  
        Thread consumerThread = new Thread(new Consumer(queue));  
        consumerThread.start();  
    }  
}

在这个例子中,创建了一个大小为5的ArrayBlockingQueue,生产者线程会生成从0到4的整数,并尝试将它们放入队列中,如果队列已满,生产者线程会阻塞,直到队列中有空间可用,消费者线程会不断尝试从队列中取出元素,如果队列为空,消费者线程会阻塞,直到队列中有元素可取,生产者和消费者线程都使用了Thread.sleep()方法来模拟生产和消费的时间延迟。

Produced: 0  
Produced: 1  
Consumed: 0  
Produced: 2  
Consumed: 1  
Produced: 3  
Consumed: 2  
Produced: 4  
Consumed: 3  
Consumed: 4

核心API

ArrayBlockingQueue实现了一个基于数组的有界阻塞队列,这个队列按照 FIFO(先进先出)的原则对元素进行排序,当尝试向已满的队列中放入元素时,操作将会被阻塞;当尝试从空队列中取出元素时,操作也会被阻塞,以下是ArrayBlockingQueue类中一些主要方法的含义:

1、核心构造方法

  1. ArrayBlockingQueue(int capacity): 创建一个具有给定容量的新的ArrayBlockingQueue实例。
  2. ArrayBlockingQueue(int capacity, boolean fair): 创建一个具有给定容量和公平性设置的新ArrayBlockingQueue实例,如果设置为公平,等待时间最长的线程将获得访问队列的优先权;如果设置为不公平,则访问顺序是不确定的。

2、添加元素

  1. add(E e): 将指定的元素插入此队列的尾部,如果队列已满,则抛出IllegalStateException
  2. offer(E e): 将指定的元素插入此队列的尾部,如果队列已满,则返回false
  3. put(E e) throws InterruptedException: 将指定的元素插入此队列的尾部,等待必要的空间变得可用,如果当前线程被中断,则抛出InterruptedException
  4. offer(E e, long timeout, TimeUnit unit): 将指定的元素插入此队列的尾部,等待指定的时间以使空间变得可用,如果在指定的时间内队列仍然满,则返回false

3、移除元素

  1. remove(): 移除并返回此队列的头部,如果队列为空,则抛出NoSuchElementException
  2. poll(): 移除并返回此队列的头部,或者如果队列为空,则返回null
  3. take() throws InterruptedException: 移除并返回此队列的头部,等待元素变得可用,如果当前线程被中断,则抛出InterruptedException
  4. poll(long timeout, TimeUnit unit): 移除并返回此队列的头部,等待指定的时间以使元素可用,如果在指定的时间内队列仍然为空,则返回null

4、检查元素

  1. element(): 获取但不移除此队列的头部,如果队列为空,则抛出NoSuchElementException
  2. peek(): 获取但不移除此队列的头部,或者如果队列为空,则返回null

5、其他方法

  1. size(): 返回队列中的元素数量。
  2. remainingCapacity(): 返回队列的理想最大容量与当前大小之间的差值。
  3. clear(): 移除此队列中的所有元素。
  4. contains(Object o): 如果此队列包含指定的元素,则返回true
  5. drainTo(Collection<? super E> c): 移除此队列中所有可用的元素,并将它们添加到给定的集合中。
  6. drainTo(Collection<? super E> c, int maxElements): 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定的集合中。
  7. toArray(): 返回以适当顺序包含此队列中所有元素的数组。
  8. iterator(): 返回在此队列的元素上进行迭代的迭代器。

核心总结

Java并发基础:ArrayBlockingQueue全面解析! - 程序员古德

ArrayBlockingQueue是一个非常实用的有界阻塞队列,其优点在于,基于数组实现,内存占用连续,查询速度快;同时支持多线程间的同步操作,能够很好地处理生产者-消费者问题。

另外,它还可以设置公平性,确保等待时间最长的线程优先获取资源。但是,由于是基于数组实现的,所以在初始化时需要指定队列大小,且之后无法改变,这在某些场景下可能不够灵活,当队列满或空时,相关操作会被阻塞,如果处理不当,可能会导致线程挂起或资源浪费。

在使用ArrayBlockingQueue时,要合理设置队列大小,避免过大或过小,同时,要注意处理阻塞情况,可以通过设置超时时间或使用offerpoll等非阻塞方法来避免线程长时间等待,此外,在多线程环境下使用时,要注意线程安全问题,确保数据的正确性和一致性。

关注我,每天学习互联网编程技术 - 程序员古德

END!

往期回顾

Java并发基础:LinkedTransferQueue全面解析!

Java并发基础:BlockingQueue和BlockingDeque接口的区别?

Java并发基础:Deque接口和Queue接口的区别?

Spring核心基础:全面总结Spring中提供的那些基础工具类!

Java并发基础:FutureTask全面解析!

相关文章
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
16 2
|
7天前
|
Java
轻松上手Java字节码编辑:IDEA插件VisualClassBytes全方位解析
本插件VisualClassBytes可修改class字节码,包括class信息、字段信息、内部类,常量池和方法等。
52 6
|
5天前
|
存储 算法 Java
Java Set深度解析:为何它能成为“无重复”的代名词?
Java的集合框架中,Set接口以其“无重复”特性著称。本文解析了Set的实现原理,包括HashSet和TreeSet的不同数据结构和算法,以及如何通过示例代码实现最佳实践。选择合适的Set实现类和正确实现自定义对象的hashCode()和equals()方法是关键。
18 4
|
8天前
|
Java 编译器 数据库连接
Java中的异常处理机制深度解析####
本文深入探讨了Java编程语言中异常处理机制的核心原理、类型及其最佳实践,旨在帮助开发者更好地理解和应用这一关键特性。通过实例分析,揭示了try-catch-finally结构的重要性,以及如何利用自定义异常提升代码的健壮性和可读性。文章还讨论了异常处理在大型项目中的最佳实践,为提高软件质量提供指导。 ####
|
9天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
18天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
5天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
25 9
|
8天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
5天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
8天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
22 3

推荐镜像

更多