【JavaEE】多线程之阻塞队列(BlockingQueue)

简介: 【JavaEE】多线程之阻塞队列(BlockingQueue)

1.了解阻塞队列

阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.

阻塞队列顾名思义就是带有阻塞特性的队列的,它是如何运行的呢?

答: 1.当队列为空时,尝试出队列,就会发生阻塞,直到队列不空为止。

      2.当队列为满时,尝试入队列,就会发送阻塞,直到队列不满为止

阻塞队列在 多线程中非常有用,它可以调节生产者和消费者之间的关系,当生产者生产的资源太多,以至于消费者无法完全消费完,那么可以让生产者阻塞,达到生产、消费平衡的作用。


2.生产者消费者模型又是什么?

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。


它将所有工作分工明确,参考下面这个例子:

有4个人我们一起包饺子,我们每个人都各顾各的,每个人都得做擀面皮和包饺子,如果这样的话,每个人的包饺子的效率都不是很高。若我们进行分工,1个人负责擀面皮,3个人负责包饺子,擀面皮的不管包饺子的是如何包,包饺子的不管擀面皮的是如何擀的。在第二种方式中擀面皮的就可以看作生产者,包饺子的就可以看作消费者。

2.1生产者消费者模型的优点

2.1.1降低服务器与服务器之间耦合度

看如下解读:开始时A服务器和B服务器通过阻塞对象进行交互,若B服务器挂了,那么我们可以迅速的做出反应,参考阻塞队列进行重新加载B服务器。还有就是若A服务器业务服务器,B服务器是实现业务的服务区,因为业务服务器需要时常更新代码,因此可以阻塞队列相当于一个缓存区。



2.1.2“削峰填谷”平衡消费者和生产的处理能力

比如在 "秒杀" (购买物品)场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.在这里就起到了削峰的效果。



“ 削峰填谷”我们可以联想到三峡大坝:

三峡可谓是“功在当代,利在千秋”,为何这样说?三峡大坝它可不仅仅只是发电用的,它还有其他的功能。当到达雨季时,三峡上游的水向下流的会比往常多,这时开始关闸蓄水,防止下游发生洪灾,这就是“削峰”,过了雨季,下游需要灌溉庄稼,但是下游水又少了,这时打开闸门进行放水,造福下游,这就是“填谷”。


在购物系统中,我们就可以将秒杀时突然猛增的用户请求比喻成雨季的雨水,我们通过阻塞队列进行蓄洪,当在平常时,没那么多的用户请求时,我们就是慢慢的解决用户的请求比喻成开闸放水。


3.标准库中的阻塞队列(BlockingQueue

BlockingQueue的成员:

ArrayBlockingQueue:基于数组的阻塞队列实现

LinkedBlockingQueue:基于链表的阻塞队列

PriorityBlockingQueue:基于优先队列的阻塞队列

BlockingQueue:这个是实现阻塞队列的接口,继承与Queue

常用方法介绍:


1688789730726.png


注意:在这个些方法中,只有put()方法和take()方法带有阻塞特性,其他的方法不具备阻塞特性。

3.1基于标准库(BlockingQueue)实现一个简单的阻塞队列的应用

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class testDemo1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer>  queue=new LinkedBlockingDeque<>();
        //生产者
        Thread producer=new Thread(()->{
            int i=0;
            while (true) {
                try {
                    System.out.println("生产:"+i);
                    queue.put(i);
                    i++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        //消费者
        Thread customer=new Thread(()-> {
            while (true) {
                try {
                    int value=queue.take();
                    System.out.println("消费:"+value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();
    }
}

代码运行结果:



4.基于循环队列模拟是实现阻塞队列

这此次模拟实现中,仅模拟了put()方法,和take()方法。


代码实现如下:

public class MyBlockingDeque {
    private int[] items=new int[1000];
    //头指针
    volatile private int head;
    //尾指针
    volatile private int tail;
    //队列中元素数量
    volatile private int size;
     synchronized public void put(int value) throws InterruptedException {
        //如果队列已满,阻塞等待
        while (size == items.length) {
            this.wait();
        }
        items[tail]=value;
        tail++;
        //如果队列满了,从头再来(循坏队列思想)
        if(tail==items.length) {
            tail=0;
        }
        size++;
        //唤醒take
        this.notify();
    }
     synchronized public int take() throws InterruptedException {
        while (size==0){
            //等待放入元素,等待put唤醒
            this.wait();
        }
        int value=items[head];
        head++;
        //如果头到了队尾,重新加载
        if(head == items.length) {
            head=0;
        }
        size--;
        this.notify();
        return value;
    }
}

4.1 put()方法

put 方法是向队列中存放数据,因此第一步我们需要判断队列满没满,若队列满了的话,就让队列通过wait方法进入阻塞状态,等待唤醒。若队列没有满,就将数据放入到尾指针所指位置,让尾指针向后移动一位;当尾指针达到数组的长度时,我们就需要重置尾指针了。

synchronized public void put(int value) throws InterruptedException {
        //如果队列已满,阻塞等待
        while (size == items.length) {
            this.wait();
        }
        items[tail]=value;
        tail++;
        //如果队列满了,从头再来(循坏队列思想)
        if(tail==items.length) {
            tail=0;
        }
        size++;
        //唤醒take
        this.notify();
    }

4.2 take()方法

take 方法是在队列中拿取数据,有两种情况,若队列空,发生阻塞,直到put进来数据,若不为空,拿到数据,头指针向后走,如果头指针达到数组长度,重置头指针,最后返回value。

synchronized public int take() throws InterruptedException {
        while (size==0){
            //等待放入元素,等待put唤醒
            this.wait();
        }
        int value=items[head];
        head++;
        //如果头到了队尾,重新加载
        if(head == items.length) {
            head=0;
        }
        size--;
        //唤醒put
        this.notify();
        return value;
    }

4.3put方法和take方法是如何被唤醒的

1.当存入数据时,队列为满的时候,队列就会通过wait方法使线程进入阻塞态,它什么时候被唤醒呢?队列被拿走一个数据时,被唤醒,这个唤醒是在take中唤醒的,take拿走一个数据队列不为满,唤醒put。

2. 当拿去数据时,队列为空的时候,队列会通过while判断,让线程进入阻塞态,当有新的数据被放入时,take就会被唤醒。



相关文章
|
6天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue
`ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要: - **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。 - **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。 - **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。
30 0
|
6天前
|
存储 安全 Java
并发编程知识点(volatile、JMM、锁、CAS、阻塞队列、线程池、死锁)
并发编程知识点(volatile、JMM、锁、CAS、阻塞队列、线程池、死锁)
72 3
|
6天前
|
设计模式 消息中间件 安全
【Java多线程】关于多线程的一些案例 —— 单例模式中的饿汉模式和懒汉模式以及阻塞队列
【Java多线程】关于多线程的一些案例 —— 单例模式中的饿汉模式和懒汉模式以及阻塞队列
13 0
|
6天前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
|
6天前
|
消息中间件 监控 安全
【JAVAEE学习】探究Java中多线程的使用和重点及考点
【JAVAEE学习】探究Java中多线程的使用和重点及考点
|
6天前
|
监控 安全 Java
【JavaEE多线程】深入解析Java并发工具类与应用实践
【JavaEE多线程】深入解析Java并发工具类与应用实践
33 1
|
6天前
|
安全 Java API
JavaEE多线程】深入理解CAS操作:无锁编程的核心
JavaEE多线程】深入理解CAS操作:无锁编程的核心
20 0
|
6天前
|
算法 Java 编译器
【JavaEE多线程】掌握锁策略与预防死锁
【JavaEE多线程】掌握锁策略与预防死锁
24 2
|
6天前
|
设计模式 安全 Java
【JavaEE多线程】从单例模式到线程池的深入探索
【JavaEE多线程】从单例模式到线程池的深入探索
22 2
|
6天前
|
安全 Java 编译器
【JavaEE多线程】线程安全、锁机制及线程间通信
【JavaEE多线程】线程安全、锁机制及线程间通信
33 1