内容概要
LinkedBlockingQueue
类是以链表结构实现高效线程安全队列,具有出色的并发性能、灵活的阻塞与非阻塞操作,以及适用于生产者和消费者模式的能力,此外,LinkedBlockingQueue
还具有高度的可伸缩性,能够在多线程环境中有效管理数据共享,是提升程序并发性能和稳定性的关键组件。
核心概念
假如有一个在线购物平台,这个平台需要处理大量的订单,每当有用户下单,系统就需要将这个订单信息传递给后台的处理程序进行进一步的处理,比如库存管理、支付处理、物流分配等,但是,由于用户量巨大,订单量也非常大,如果直接让处理程序立即处理每一个订单,那么系统可能会因为处理不过来而崩溃。
这是,可以使用LinkedBlockingQueue
来解决这问题,可以把LinkedBlockingQueue
想象成一个排队等候区,每当有用户下单,订单信息就被放到这个排队等候区里面,后台的处理程序则可以从这个排队等候区里面取出订单进行处理,由于LinkedBlockingQueue
是一个线程安全的队列,所以它可以保证在多线程环境下,订单信息不会被重复处理或丢失。
而且,LinkedBlockingQueue
还有一个很有用的特性,就是它可以在插入元素时设置等待时间,因此,如果排队等候区已经满了(达到了最大容量),新的订单信息就需要等待直到有空间可以放入,通过设置等待时间,可以控制新订单信息需要等待多久,如果超过了这个时间还没有空间,那么就可以抛出一个异常或者进行其他的处理。
LinkedBlockingQueue
主要用来解决多线程间的数据共享和传输问题,尤其是在生产者-消费者场景中。这是一个线程安全的阻塞队列,它实现了BlockingQueue
接口,并基于链表数据结构。
LinkedBlockingQueue
通常用来解决以下情况问题:
- 线程安全的数据共享:在多线程环境中,当多个线程需要访问和修改共享数据时,可能会出现数据不一致的问题,
LinkedBlockingQueue
通过内部的锁机制和其他同步措施,确保在并发情况下数据的完整性和一致性。 - 生产者-消费者问题:这是并发编程中的一个经典问题,其中生产者生成数据放入缓冲区,消费者从缓冲区取出数据,如果缓冲区已满,生产者需要等待;如果缓冲区为空,消费者需要等待,
LinkedBlockingQueue
提供了阻塞的put()
和take()
方法,使得生产者和消费者可以在队列满或空时自动阻塞等待,从而简化了编程复杂度。 - 流量控制:在高并发系统中,如果后端处理速度跟不上前端生成数据的速度,可能会导致系统崩溃,通过使用
LinkedBlockingQueue
,可以设置一个有限的队列容量,当队列满时,生产者会被阻塞,从而实现了对流量的控制。 - 解耦:
LinkedBlockingQueue
还可以用于解耦生产者和消费者之间的直接依赖关系,生产者只需要将数据放入队列,而不需要关心消费者何时或如何消费这些数据;同样,消费者只需要从队列中取出数据处理,而不需要关心数据是谁生产的或如何生产的。这种解耦有助于提高系统的可维护性和可扩展性。
代码案例
下面使用LinkedBlockingQueue
类模拟了一个生产者-消费者场景,其中生产者生成整数并将它们放入队列,而消费者从队列中取出整数并处理它们,如下:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
// 生产者类,用于生成数据并放入队列
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(200); // 模拟生产时间
int item = i;
System.out.println("生产者生产了: " + item);
queue.put(item); // 将生产的数据放入队列
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者类,用于从队列中取出数据并处理
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // 从队列中取出数据,如果队列为空则阻塞
System.out.println("消费者消费了: " + item);
if (item == 9) {
// 假设生产10个元素后结束消费
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 主类,用于演示生产者和消费者的使用
public class LinkedBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); // 创建一个容量为10的LinkedBlockingQueue
// 启动生产者线程
Thread producerThread = new Thread(new Producer(queue));
producerThread.start();
// 启动消费者线程
Thread consumerThread = new Thread(new Consumer(queue));
consumerThread.start();
// 等待两个线程执行完成(在实际应用中,可能需要使用更复杂的同步机制来等待线程结束)
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("程序执行完毕");
}
}
在上面代码中,定义了一个生产者类Producer
和一个消费者类Consumer
,它们都实现了Runnable
接口,因此可以作为线程运行,生产者在其run
方法中循环生成整数,并通过queue.put(item)
方法将它们放入队列中,消费者在其run
方法中通过queue.take()
方法从队列中取出整数并处理它们,如果队列为空,take()
方法会阻塞,直到队列中有元素可用。
在main
方法中,创建了一个容量为10的LinkedBlockingQueue
实例,并分别启动了一个生产者线程和一个消费者线程,程序会等待这两个线程执行完成后输出“程序执行完毕”。
核心API
LinkedBlockingQueue
是实现了BlockingQueue
接口,是一个基于链表的、线程安全的阻塞队列,以下是LinkedBlockingQueue
类中一些主要方法的含义:
- 构造方法
LinkedBlockingQueue()
:创建一个具有默认容量(Integer.MAX_VALUE)的LinkedBlockingQueue
。LinkedBlockingQueue(int capacity)
:创建一个具有指定容量的LinkedBlockingQueue
。
- 添加元素
add(E e)
:将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回true
,如果当前没有可用的空间,则抛出IllegalStateException
。offer(E e)
:将指定的元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回true
,如果当前没有可用的空间,则返回false
。put(E e) throws InterruptedException
:将指定的元素插入此队列中,等待必要的空间可用;如果空间不可用,则等待直到空间可用或线程被中断。offer(E e, long timeout, TimeUnit unit)
:将指定的元素插入此队列中,等待指定的时间以允许其他线程插入或移除元素,或者直到此线程被中断。
- 移除元素
remove()
:移除并返回此队列的头部,如果此队列为空,则抛出NoSuchElementException
。poll()
:移除并返回此队列的头部,如果此队列为空,则返回null
。take() throws InterruptedException
:移除并返回此队列的头部,等待(如果必要)直到元素变得可用或线程被中断。poll(long timeout, TimeUnit unit)
:移除并返回此队列的头部,等待指定的时间以允许其他线程插入元素,或者直到此线程被中断或超时。
- 检查元素
element()
:检索但不移除此队列的头部,如果此队列为空,则抛出NoSuchElementException
。peek()
:检索但不移除此队列的头部,如果此队列为空,则返回null
。
- 其它有用的方法
size()
:返回队列中的元素数量。这个方法的精确度可能会受到并发修改的影响,因此它主要用于监控,而不是用于同步控制。remainingCapacity()
:返回理想情况下(没有内存和资源约束)此队列可接受的额外元素数量。clear()
:移除队列中的所有元素。contains(Object o)
:检查队列中是否包含指定的元素。drainTo(Collection<? super E> c)
和drainTo(Collection<? super E> c, int maxElements)
:将队列中的元素移除并添加到指定的集合中,直到队列为空或移除了指定数量的元素。iterator()
:返回队列中元素的迭代器。注意,迭代器的remove()
方法会删除队列中的元素,但是不会自动调整队列的容量。toArray()
:返回包含队列中所有元素的数组。
核心总结
LinkedBlockingQueue
实现了BlockingQueue
接口,以链表结构存储元素,保证了线程安全,其优点在于具有高效的并发性能和可伸缩性,适用于生产者和消费者模式,能够很好地处理多线程间的数据共享问题,它的缺点是在比如数据量非常大时,由于链表结构的内存开销,可能会占用较多内存,此外,虽然它提供了阻塞和非阻塞的操作,但在高并发场景下,线程间的竞争可能导致性能下降。
END!
往期回顾
Java并发基础:LinkedBlockingDeque全面解析!
Java并发基础:LinkedTransferQueue全面解析!