【JavaEE】多线程之阻塞队列(BlockingQueue)

简介: 【JavaEE】多线程之阻塞队列(BlockingQueue)

1.了解阻塞队列

阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.

阻塞队列顾名思义就是带有阻塞特性的队列的,它是如何运行的呢?

答: 1.当队列为空时,尝试出队列,就会发生阻塞,直到队列不空为止。

      2.当队列为满时,尝试入队列,就会发送阻塞,直到队列不满为止

阻塞队列在 多线程中非常有用,它可以调节生产者和消费者之间的关系,当生产者生产的资源太多,以至于消费者无法完全消费完,那么可以让生产者阻塞,达到生产、消费平衡的作用。


2.生产者消费者模型又是什么?

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。


它将所有工作分工明确,参考下面这个例子:

有4个人我们一起包饺子,我们每个人都各顾各的,每个人都得做擀面皮和包饺子,如果这样的话,每个人的包饺子的效率都不是很高。若我们进行分工,1个人负责擀面皮,3个人负责包饺子,擀面皮的不管包饺子的是如何包,包饺子的不管擀面皮的是如何擀的。在第二种方式中擀面皮的就可以看作生产者,包饺子的就可以看作消费者。

2.1生产者消费者模型的优点

2.1.1降低服务器与服务器之间耦合度

看如下解读:开始时A服务器和B服务器通过阻塞对象进行交互,若B服务器挂了,那么我们可以迅速的做出反应,参考阻塞队列进行重新加载B服务器。还有就是若A服务器业务服务器,B服务器是实现业务的服务区,因为业务服务器需要时常更新代码,因此可以阻塞队列相当于一个缓存区。



2.1.2“削峰填谷”平衡消费者和生产的处理能力

比如在 "秒杀" (购买物品)场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.在这里就起到了削峰的效果。



“ 削峰填谷”我们可以联想到三峡大坝:

三峡可谓是“功在当代,利在千秋”,为何这样说?三峡大坝它可不仅仅只是发电用的,它还有其他的功能。当到达雨季时,三峡上游的水向下流的会比往常多,这时开始关闸蓄水,防止下游发生洪灾,这就是“削峰”,过了雨季,下游需要灌溉庄稼,但是下游水又少了,这时打开闸门进行放水,造福下游,这就是“填谷”。


在购物系统中,我们就可以将秒杀时突然猛增的用户请求比喻成雨季的雨水,我们通过阻塞队列进行蓄洪,当在平常时,没那么多的用户请求时,我们就是慢慢的解决用户的请求比喻成开闸放水。


3.标准库中的阻塞队列(BlockingQueue

BlockingQueue的成员:

ArrayBlockingQueue:基于数组的阻塞队列实现

LinkedBlockingQueue:基于链表的阻塞队列

PriorityBlockingQueue:基于优先队列的阻塞队列

BlockingQueue:这个是实现阻塞队列的接口,继承与Queue

常用方法介绍:


1688789730726.png


注意:在这个些方法中,只有put()方法和take()方法带有阻塞特性,其他的方法不具备阻塞特性。

3.1基于标准库(BlockingQueue)实现一个简单的阻塞队列的应用

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class testDemo1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer>  queue=new LinkedBlockingDeque<>();
        //生产者
        Thread producer=new Thread(()->{
            int i=0;
            while (true) {
                try {
                    System.out.println("生产:"+i);
                    queue.put(i);
                    i++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        //消费者
        Thread customer=new Thread(()-> {
            while (true) {
                try {
                    int value=queue.take();
                    System.out.println("消费:"+value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();
    }
}

代码运行结果:



4.基于循环队列模拟是实现阻塞队列

这此次模拟实现中,仅模拟了put()方法,和take()方法。


代码实现如下:

public class MyBlockingDeque {
    private int[] items=new int[1000];
    //头指针
    volatile private int head;
    //尾指针
    volatile private int tail;
    //队列中元素数量
    volatile private int size;
     synchronized public void put(int value) throws InterruptedException {
        //如果队列已满,阻塞等待
        while (size == items.length) {
            this.wait();
        }
        items[tail]=value;
        tail++;
        //如果队列满了,从头再来(循坏队列思想)
        if(tail==items.length) {
            tail=0;
        }
        size++;
        //唤醒take
        this.notify();
    }
     synchronized public int take() throws InterruptedException {
        while (size==0){
            //等待放入元素,等待put唤醒
            this.wait();
        }
        int value=items[head];
        head++;
        //如果头到了队尾,重新加载
        if(head == items.length) {
            head=0;
        }
        size--;
        this.notify();
        return value;
    }
}

4.1 put()方法

put 方法是向队列中存放数据,因此第一步我们需要判断队列满没满,若队列满了的话,就让队列通过wait方法进入阻塞状态,等待唤醒。若队列没有满,就将数据放入到尾指针所指位置,让尾指针向后移动一位;当尾指针达到数组的长度时,我们就需要重置尾指针了。

synchronized public void put(int value) throws InterruptedException {
        //如果队列已满,阻塞等待
        while (size == items.length) {
            this.wait();
        }
        items[tail]=value;
        tail++;
        //如果队列满了,从头再来(循坏队列思想)
        if(tail==items.length) {
            tail=0;
        }
        size++;
        //唤醒take
        this.notify();
    }

4.2 take()方法

take 方法是在队列中拿取数据,有两种情况,若队列空,发生阻塞,直到put进来数据,若不为空,拿到数据,头指针向后走,如果头指针达到数组长度,重置头指针,最后返回value。

synchronized public int take() throws InterruptedException {
        while (size==0){
            //等待放入元素,等待put唤醒
            this.wait();
        }
        int value=items[head];
        head++;
        //如果头到了队尾,重新加载
        if(head == items.length) {
            head=0;
        }
        size--;
        //唤醒put
        this.notify();
        return value;
    }

4.3put方法和take方法是如何被唤醒的

1.当存入数据时,队列为满的时候,队列就会通过wait方法使线程进入阻塞态,它什么时候被唤醒呢?队列被拿走一个数据时,被唤醒,这个唤醒是在take中唤醒的,take拿走一个数据队列不为满,唤醒put。

2. 当拿去数据时,队列为空的时候,队列会通过while判断,让线程进入阻塞态,当有新的数据被放入时,take就会被唤醒。



相关文章
|
5月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
23天前
|
Java
【JavaEE】——多线程常用类
Callable的call方法,FutureTask类,ReentrantLock可重入锁和对比,Semaphore信号量(PV操作)CountDownLatch锁存器,
|
23天前
|
Java 关系型数据库 MySQL
【JavaEE“多线程进阶”】——各种“锁”大总结
乐/悲观锁,轻/重量级锁,自旋锁,挂起等待锁,普通互斥锁,读写锁,公不公平锁,可不可重入锁,synchronized加锁三阶段过程,锁消除,锁粗化
|
23天前
|
Java Go 调度
【JavaEE】——线程池大总结
线程数量问题解决方式,代码实现线程池,ThreadPoolExecutor(核心构造方法),参数的解释(面试:拒绝策略),Executors,工厂模式,工厂类
|
23天前
|
缓存 安全 Java
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
|
23天前
|
Java 调度
|
23天前
|
Java 调度
【JavaEE】——线程的安全问题和解决方式
【JavaEE】——线程的安全问题和解决方式。为什么多线程运行会有安全问题,解决线程安全问题的思路,synchronized关键字的运用,加锁机制,“锁竞争”,几个变式
|
23天前
|
Java API 调度
【JavaEE】——多线程(join阻塞,计算,引用,状态)
【JavaEE】——多线程,join,sleep引起的线程阻塞,多线程提升计算效率,如何获取线程的引用和状态
|
23天前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
23天前
|
安全 Java API
【JavaEE】多线程编程引入——认识Thread类
Thread类,Thread中的run方法,在编程中怎么调度多线程