内容摘要
ArrayBlockingQueue
类是一个高效、线程安全的队列实现,它基于数组,提供了快速的元素访问,并支持多线程间的同步操作,作为有界队列,它能有效防止内存溢出,并通过阻塞机制平衡生产者和消费者的速度差异,它还提供了公平性和非公平性策略,满足不同场景下的需求。
核心概念
主要场景
在现实业务场景中,可以将ArrayBlockingQueue
地运用到许多需要处理并发和资源限制的问题上,假设,团队正在构建一个在线订餐系统,其中有一个核心模块负责处理订单请求并将订单分配给餐厅厨房进行制作。
比如,厨房的每个工作台都有一定的处理能力,比如同时只能处理5个订单,超过这个数量,工作台就会变得拥挤而无法再接单,为了模拟这种有限的处理能力,可以创建一个容量为5的ArrayBlockingQueue
。
每当用户通过前端提交了一个新的订单请求时,后端的订单处理器线程会尝试将这个订单对象作为一个任务放入ArrayBlockingQueue
中,如果此时队列未满,订单会被成功放入并通知厨房开始处理;但如果队列已满,则表示当前厨房工作台负荷过大,订单处理器线程会进入等待状态,直到厨房完成了一个订单并将结果从队列中取出后,新订单才有机会被加入队列。
此刻,ArrayBlockingQueue
就像是厨房与订单处理器之间的缓冲区和信号灯,它既能控制流入厨房的订单流,防止过载,又能确保订单处理器在没有订单可处理时不会空转浪费资源,从而保证整个系统的稳定性和效率。
主要功能
ArrayBlockingQueue
主要用于解决以下功能问题:
- 多线程间的数据共享:
在多线程编程中,线程之间经常需要共享数据ArrayBlockingQueue
作为一个线程安全的队列,允许不同线程安全地添加和移除元素,它内部的同步机制确保了在并发环境下数据的一致性和完整性。 - 生产者-消费者协作:
ArrayBlockingQueue
是实现生产者-消费者模式的理想选择,在生产者-消费者模式中,生产者产生数据放入缓冲区,而消费者从缓冲区中取走数据,ArrayBlockingQueue
的阻塞特性能够自动调节生产者和消费者的速度:当缓冲区满时,生产者会被阻塞直到有空间可用;当缓冲区空时,消费者会被阻塞直到有数据可取。 - 流量控制:
由于ArrayBlockingQueue
是一个有界队列,它可以用来实现流量控制,通过设置队列的最大容量,可以限制系统中待处理的任务或数据的数量,这对于防止资源过载和维持系统的稳定性至关重要。 - 任务调度与负载均衡:
在并发系统中,ArrayBlockingQueue
可以作为任务队列使用,用于存储待执行的任务,线程池中的工作线程可以从队列中取出任务进行处理,从而实现任务的调度和负载均衡。 - 解耦:
使用ArrayBlockingQueue
可以将数据的生产和消费解耦,生产者不需要知道消费者的具体实现,只需要将数据放入队列;同样,消费者也不需要知道生产者的具体实现,只需要从队列中取出数据,这提高了系统的可维护性和可扩展性。 - 缓冲:
ArrayBlockingQueue
作为一个缓冲区,可以平滑生产者和消费者之间的速度差异,当生产者速度较快时,队列可以存储多余的数据;当消费者速度较快时,队列可以提供足够的数据供其消费,这有助于减少系统的响应时间和提高吞吐量。
代码案例
下面是一个简单的Java程序,演示了如何使用ArrayBlockingQueue
类实现一个生产者-消费者场景,其中生产者线程向队列中添加数据,而消费者线程从队列中移除数据,如下代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// 生产者类,用于向队列中添加数据
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int maxSize;
public Producer(BlockingQueue<Integer> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
try {
for (int i = 0; i < maxSize; i++) {
// 模拟生产时间
Thread.sleep((long) (Math.random() * 1000));
queue.put(i); // 将数据放入队列,如果队列已满则阻塞
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者类,用于从队列中移除数据
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer consumed = queue.take(); // 从队列中取出数据,如果队列为空则阻塞
System.out.println("Consumed: " + consumed);
// 假设消费完maxSize-1个元素后,消费者就不再消费了
if (consumed == queue.remainingCapacity()) {
break;
}
// 模拟消费时间
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 主类,包含main方法,用于启动生产者和消费者线程
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
int queueSize = 5; // 队列大小
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize); // 创建一个有界阻塞队列
// 启动生产者线程
Thread producerThread = new Thread(new Producer(queue, queueSize));
producerThread.start();
// 启动消费者线程
Thread consumerThread = new Thread(new Consumer(queue));
consumerThread.start();
}
}
在这个例子中,创建了一个大小为5的ArrayBlockingQueue
,生产者线程会生成从0到4的整数,并尝试将它们放入队列中,如果队列已满,生产者线程会阻塞,直到队列中有空间可用,消费者线程会不断尝试从队列中取出元素,如果队列为空,消费者线程会阻塞,直到队列中有元素可取,生产者和消费者线程都使用了Thread.sleep()
方法来模拟生产和消费的时间延迟。
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Consumed: 4
核心API
ArrayBlockingQueue
实现了一个基于数组的有界阻塞队列,这个队列按照 FIFO(先进先出)的原则对元素进行排序,当尝试向已满的队列中放入元素时,操作将会被阻塞;当尝试从空队列中取出元素时,操作也会被阻塞,以下是ArrayBlockingQueue
类中一些主要方法的含义:
1、核心构造方法
ArrayBlockingQueue(int capacity)
: 创建一个具有给定容量的新的ArrayBlockingQueue
实例。ArrayBlockingQueue(int capacity, boolean fair)
: 创建一个具有给定容量和公平性设置的新ArrayBlockingQueue
实例,如果设置为公平,等待时间最长的线程将获得访问队列的优先权;如果设置为不公平,则访问顺序是不确定的。
2、添加元素
add(E e)
: 将指定的元素插入此队列的尾部,如果队列已满,则抛出IllegalStateException
。offer(E e)
: 将指定的元素插入此队列的尾部,如果队列已满,则返回false
。put(E e) throws InterruptedException
: 将指定的元素插入此队列的尾部,等待必要的空间变得可用,如果当前线程被中断,则抛出InterruptedException
。offer(E e, long timeout, TimeUnit unit)
: 将指定的元素插入此队列的尾部,等待指定的时间以使空间变得可用,如果在指定的时间内队列仍然满,则返回false
。
3、移除元素
remove()
: 移除并返回此队列的头部,如果队列为空,则抛出NoSuchElementException
。poll()
: 移除并返回此队列的头部,或者如果队列为空,则返回null
。take() throws InterruptedException
: 移除并返回此队列的头部,等待元素变得可用,如果当前线程被中断,则抛出InterruptedException
。poll(long timeout, TimeUnit unit)
: 移除并返回此队列的头部,等待指定的时间以使元素可用,如果在指定的时间内队列仍然为空,则返回null
。
4、检查元素
element()
: 获取但不移除此队列的头部,如果队列为空,则抛出NoSuchElementException
。peek()
: 获取但不移除此队列的头部,或者如果队列为空,则返回null
。
5、其他方法
size()
: 返回队列中的元素数量。remainingCapacity()
: 返回队列的理想最大容量与当前大小之间的差值。clear()
: 移除此队列中的所有元素。contains(Object o)
: 如果此队列包含指定的元素,则返回true
。drainTo(Collection<? super E> c)
: 移除此队列中所有可用的元素,并将它们添加到给定的集合中。drainTo(Collection<? super E> c, int maxElements)
: 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定的集合中。toArray()
: 返回以适当顺序包含此队列中所有元素的数组。iterator()
: 返回在此队列的元素上进行迭代的迭代器。
核心总结
ArrayBlockingQueue
是一个非常实用的有界阻塞队列,其优点在于,基于数组实现,内存占用连续,查询速度快;同时支持多线程间的同步操作,能够很好地处理生产者-消费者问题。
另外,它还可以设置公平性,确保等待时间最长的线程优先获取资源。但是,由于是基于数组实现的,所以在初始化时需要指定队列大小,且之后无法改变,这在某些场景下可能不够灵活,当队列满或空时,相关操作会被阻塞,如果处理不当,可能会导致线程挂起或资源浪费。
在使用ArrayBlockingQueue
时,要合理设置队列大小,避免过大或过小,同时,要注意处理阻塞情况,可以通过设置超时时间或使用offer
、poll
等非阻塞方法来避免线程长时间等待,此外,在多线程环境下使用时,要注意线程安全问题,确保数据的正确性和一致性。
END!
往期回顾
Java并发基础:LinkedTransferQueue全面解析!
Java并发基础:BlockingQueue和BlockingDeque接口的区别?