堵塞队列BlockingQueue
什么是堵塞队列
堵塞队列本质就是队列,底层数据结构 通常是由数组,或者链表构成。实现FIFO思想
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
注意:bolckingQueue是在多线程环境下提供的线程安全的队列
与ArrayList区别
1、ArrayList线程不安全,blockingQueue线程安全
2、ArrayList可以扩容,blockingQueue队列不能
为什么要使用堵塞队列
1、我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,BlockingQueue都给你一手包办了
2、如果有很多任务要处理,我们当前处理不了,总不能不处理。我们可以延迟处理,总比不处理要好
阻塞队列使用场景
1、生产者消费者模式
传统版(synchronized, wait, notify)
阻塞队列版(lock, await, signal)
2、线程池
3、消息中间件
怎么使用堵塞队列
blockingQueue实现类
ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现延迟无界阻塞队列。
SynchronousQueue:不存储元素的阻塞队列(生产一个消费一个)。
LinkedTransferQueue:由链表结构绒成的无界阻塞队列。
LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
BlockingQueue核心方法组
offer和poll组
public static void main(String[] args) {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
new Thread(()->{
System.out.println("元素成功入队否 "+blockingQueue.offer("1"));
System.out.println("元素成功入队否 "+blockingQueue.offer("2"));
},"producer1").start();
new Thread(()->{
System.out.println("元素成功入队否 "+blockingQueue.offer("3"));
System.out.println("元素成功入队否 "+blockingQueue.offer("4"));
System.out.println("阻塞队列中当前拥有数据个数: "+blockingQueue.size());
},"producer2").start();
new Thread(()->{
System.out.println("成功消费数据 "+blockingQueue.poll());
System.out.println("阻塞队列中当前拥有数据个数: "+blockingQueue.size());
},"consumer").start();
}
结果:
阻塞队列中只能存放指定个数的数据,如果使用offer(),将数据放入队列,当前队列已满,消费线程没有来得及消费,那么offer放入数据会失败
超时的 offer和poll组 与上面代码类似,只不过加了时间限制
put和take组
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
new Thread(()->{
try {
blockingQueue.put("1");
blockingQueue.put("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"producer1").start();
new Thread(()->{
try {
blockingQueue.put("3");
blockingQueue.put("4");
System.out.println("阻塞队列中当前拥有数据个数: "+blockingQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"producer2").start();
}
结果:
使用put将数据消息进入队列,如果队列满了,并且没有消费者线程进行消费,那么一直会堵塞线程,只要队列不为满时,将元素放入才不会堵塞线程
SynchronousQueue队列
SynchronousQueue没有容量。
与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。
每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put A ");
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + "\t put B ");
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + "\t put C ");
blockingQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take A ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take B ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take C ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
放一个拿一个,存在一个就不能放了哦
传统模式下的生产者消费者
1、synchronized控制的
class Data{
int number = 0;
AtomicInteger atomicInteger = new AtomicInteger(0);
public void increment(){
synchronized (this){
// 不等于0进行,等待消费者消费
while (number != 0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产
number++;
System.out.println(Thread.currentThread().getName()+" 生产了一个产品: " +number);
// 通知消费者消费
this.notify();
}
}
public void decrement(){
synchronized (this){
// 等待生产者生产
while (number == 0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费
number--;
System.out.println(Thread.currentThread().getName()+" 消费了一个产品: " +number);
// 通知生产者生产
this.notify();
}
}
}
public static void main(String[] args) {
// 任务: 生产一个消费一个
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.increment();
}
},"producer").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.decrement();
}
},"consumer").start();
}
2、lock(ReentrantLock)
class Data{
int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment(){
lock.lock();
try {
// 不等于0进行,等待消费者消费
while (number != 0){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产
number++;
System.out.println(Thread.currentThread().getName()+" 生产了一个产品: " +number);
// 通知消费者消费
condition.signal();
}finally {
lock.unlock();
}
}
public void decrement(){
lock.lock();
try {
// 等待生产者生产
while (number == 0){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费
number--;
System.out.println(Thread.currentThread().getName()+" 消费了一个产品: " +number);
// 通知生产者生产
condition.signal();
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
// 任务: 生产一个消费一个
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.increment();
}
},"producer").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.decrement();
}
},"consumer").start();
}
两者运行结果:
虚假唤醒问题
存在多个线程并发争抢一个资源。以生产者消费者为例:
我们任务要求,只能生产一个产品消费一个产品。两个生产者生产,两个消费者消费。
当生产者生产完一个产品时,要唤醒等待的线程(notify是随机唤醒)。注意此时有两个消费者线程,一个生产者线程等待。如果cpu的调度权被等待的生产者获取到了,此时生产者在 wait()方法处 会直接往下执行,实际上就生产了两个产品。同理消费者也可能同时消费两个产品
根源在于:换性的线程是直接往下执行的并没有判断是否满足对应条件
产生虚假唤醒的源码
class Data{
int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment(){
lock.lock();
try {
// 不等于0进行,等待消费者消费
if (number != 0){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产
number++;
System.out.println(Thread.currentThread().getName()+" 生产了一个产品: " +number);
// 通知消费者消费
condition.signal();
}finally {
lock.unlock();
}
}
public void decrement(){
lock.lock();
try {
// 等待生产者生产
if (number == 0){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费
number--;
System.out.println(Thread.currentThread().getName()+" 消费了一个产品: " +number);
// 通知生产者生产
condition.signal();
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
// 任务: 生产一个消费一个
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.increment();
}
},"producer").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.decrement();
}
},"consumer").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.increment();
}
},"producer1").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
data.decrement();
}
},"consumer2").start();
}
可能的结果
解决:
if该while即可,唤醒的同时,进行再次判断
总结具有 await/wait方法时,需要使用while
Synchronized和Lock区别
1、synchronized属于JVM层面,属于java的关键字
monitorenter(底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象 只能在同步块或者方法中才能调用 wait/ notify等方法)
Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁
2、使用方法:
synchronized:不需要用户去手动释放锁,当synchronized代码执行后,系统会自动让线程释放对锁的占用。
ReentrantLock:则需要用户去手动释放锁,若没有主动释放锁,就有可能出现死锁的现象,需要lock() 和 unlock() 配置try catch语句来完成
3、等待是否中断
synchronized:不可中断,除非抛出异常或者正常运行完成。
ReentrantLock:可中断,可以设置超时方法
设置超时方法,trylock(long timeout, TimeUnit unit)
lockInterrupible() 放代码块中,调用interrupt() 方法可以中断
4、加锁是否公平
synchronized:非公平锁
ReentrantLock:默认非公平锁,构造函数可以传递boolean值,true为公平锁,false为非公平锁
5、锁绑定多个条件Condition
synchronized:没有,要么随机,要么全部唤醒
ReentrantLock:用来实现分组唤醒需要唤醒的线程,可以精确唤醒,而不是像synchronized那样,要么随机,要么全部唤醒
Condition实现精准唤醒线程
任务:
多线程之间按顺序调用,实现 A-> B -> C 三个线程启动,要求如下:
AA打印5次,BB打印10次,CC打印15次
class ShareData{
// 1,2,3 分别标识3个不同的线程A,B,C
int number = 1;
Lock lock = new ReentrantLock();
// condition在哪个线程中就表示是哪个线程的条件
Condition c1 =lock.newCondition();
Condition c2 =lock.newCondition();
Condition c3 =lock.newCondition();
// 功能聚合 任务写在共享资源中
public void print5(){
lock.lock();
try {
// 判断
while (number != 1){
try {
// 当前线程等待
c1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 执行任务
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
System.out.println();
// 唤醒 (干完活后,需要通知B线程执行)
number = 2;
// 通知2号去干活了
c2.signal();
}finally {
lock.unlock();
}
}
public void print10(){
lock.lock();
try {
// 判断
while (number != 2){
try {
// 当前线程等待
c2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 执行任务
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
System.out.println();
// 唤醒 (干完活后,需要通知C线程执行)
number = 3;
// 通知3号去干活了
c3.signal();
}finally {
lock.unlock();
}
}
public void print15(){
lock.lock();
try {
// 判断
while (number != 3){
try {
// 当前线程等待
c3.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 执行任务
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
System.out.println();
// 唤醒 (干完活后,需要通知A线程执行)
number = 1;
// 通知1号去干活了
c1.signal();
}finally {
lock.unlock();
}
}
}
/**
* 1、多线程操作资源类
* 2、判断需不需要等待
* 3、执行任务
* 4、通知其它线程执行
*/
public class ConditionTest {
public static void main(String[] args) {
ShareData shareData = new ShareData();
// Condition在哪个线程,表示是哪个线程的条件
new Thread(()->{
shareData.print5();
},"A").start();
new Thread(()->{
shareData.print10();
},"B").start();
new Thread(()->{
shareData.print15();
},"C").start();
}
}
执行结果
A 0
A 1
A 2
A 3
A 4
B 0
B 1
B 2
B 3
B 4
B 5
B 6
B 7
B 8
B 9
C 0
C 1
C 2
C 3
C 4
C 5
C 6
C 7
C 8
C 9
C 10
C 11
C 12
C 13
C 14
注意: Condition在哪个线程,表示是哪个线程的条件,其它线程可以使用其线程的对应condition精准控制线程调用
BlockingQueue队列下的生产者和消费者
class MyResource {
// 默认开启,进行生产消费
// 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程进行修改
private volatile boolean FLAG = true;
// 使用原子包装类,而不用number++
private AtomicInteger atomicInteger = new AtomicInteger();
// 这里不能为了满足条件,而实例化一个具体的SynchronousBlockingQueue
BlockingQueue<String> blockingQueue = null;
// 而应该采用依赖注入里面的,构造注入方法传入
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 查询出传入的class是什么
System.out.println(blockingQueue.getClass().getName());
}
public void myProducer() throws Exception{
String data = null;
boolean retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
data = atomicInteger.incrementAndGet() + "";
// 2秒存入1个data
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" );
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "失败" );
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍");
}
public void myConsumer() throws Exception{
String retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
// 2秒存入1个data
retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(retValue != null && retValue != "") {
System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" );
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" );
// 退出消费队列
return;
}
}
}
/**
* 停止生产的判断
*/
public void stop() {
this.FLAG = false;
}
}
public class BlockingQueueProducerConsumer {
public static void main(String[] args) {
// 传入具体的实现类, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动\n\n");
try {
myResource.myProducer();
System.out.println("\n");
} catch (Exception e) {
e.printStackTrace();
}
}, "producer").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
// 5秒后,停止生产和消费
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n\n5秒中后,生产和消费线程停止,线程结束");
myResource.stop();
}
}