阻塞队列 BlockingQueue

简介: 学数据结构时学过队列,特点是FIFO,先进先出。那么什么阻塞队列呢?一起来看看。

一、什么是阻塞队列?


阻塞队列,英文名BlockingQueue,顾名思义,首先它是一个队列。阻塞就是说在多线程环境下,线程在某些情况下会被挂起,这就是阻塞,一旦满足条件,又会被唤醒。那么阻塞队列的阻塞体现在何处?当一个线程从阻塞队列中取元素时,如果队列为空了,那么取元素的操作就会被阻塞,直到有其他线程往队列中添加了元素;当一个线程往阻塞队列中添加元素时,如果队列满了,那么添加元素的操作也会被阻塞,直到有其他线程从队列中取走了元素。


二、为什么要用阻塞队列?


有了阻塞队列,我们不需要关心何时阻塞线程,何时唤醒线程。因为这些操作阻塞队列都帮我们做了。队列为空那么取元素的线程会自动被阻塞,队列已满那么添加元素的线程会自动阻塞。


三、阻塞队列架构梳理



image.png

之前我们只知道 Collection 下面有 setlist,其实 queue也是继承了 Collectionqueue的子类就是 BlockingQueue,不过它还是接口,它总共有九个实现类,下面挑三个最重要的说一说。


image.png


  • ArrayBlockingQueue:按照 ArrayList 来理解,由数组结构组成的有界阻塞队列。何为有界?其实就是队列的容量。看一下它的构造方法:

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
}


容量调用构造方法时必传的参数,默认是非公平的,也就是说,它的方法都用Lock上了锁,我们知道lock可以通过true或者false来指定使用公平锁还是非公平锁。所以这里传的false或者true其实是给lock用的。


  • LinkedBlockingQueue:按照 LinkedList 来理解,由链表结构组成的有界阻塞队列。其实这里有个坑爹的地方,先来看一下它的构造方法:

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}


它有个无参构造,我们可以不传容量,默认是 ** int类型的最大值**,这不是坑爹嘛,int类型最大值是21亿多,这不就相当于无界嘛。当然,它也有带参构造,可以指定容量,但是不能指定使用公平还是非公平锁,默认使用的是非公平锁。


  • SynchronousQueue:这个队列比较特殊,只能存储一个元素。里面有一个元素的时候,添加的线程就会被阻塞。

四、阻塞队列核心方法


阻塞队列的核心方法有四组,如下表:


方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove poll() take() poll(time,unit)
检查 element() peek()


抛出异常就是说队列中没有元素还去取或者元素满了还添加,那么就抛出异常。其他几组也是按照这思路理解,都是顾名思义的,此处不再啰嗦。


五、阻塞队列用在哪?


1、用在哪?


阻塞队列有哪些应用呢?常见的有以下三个:


  • 生产消费模式
  • 线程池
  • 消息中间件


本文将讲解生产消费模式中如何使用阻塞队列。


2、生产消费模式:


  • 多线程编程的口诀
    线程   操纵   资源类,判断  干活  通知;防止虚假唤醒(判断一定要用while,不能用if)。
  • 基础版生产消费模式
    资源类:

class Resource{
    private  Integer num = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    public void produce(){ // 生产的方法
        lock.lock();
        try {
            // 1.判断
            while (num != 0){
                // 等待,不能生产
                condition.await();
            }
            // 2.干活
            num ++;
            System.out.println(Thread.currentThread().getName() + "\t" + num);
            // 3.通知
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void consum(){ // 消费的方法
        lock.lock();
        try {
            // 1.判断
            while (num == 0){
                // 等待,不能消费
                condition.await();
            }
            // 2.干活
            num --;
            System.out.println(Thread.currentThread().getName() + "\t" + num);
            // 3.通知
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}


线程操纵资源类:

public static void main(String[] args){
        Resource resource = new Resource();
        new Thread(() -> {
            for (int i=1; i<=5; i++){
                resource.produce();
            }
        },"A").start();
        new Thread(() -> {
            for (int i=1; i<=5; i++){
                resource.consum();
            }
        },"B").start();
}


这就是基础版的生产消费模式。准确的说应该是2.0版本,最开始学的是使用synchronized实现的。那么,synchronized和lock到底有什么区别呢?


  • synchronized 和 lock 的区别:


原始构成 使用方法 等待是否可中断 是否公平 唤醒
synchronized 是关键字,属于JVM层面,底层通过monitor对象来完成 不需要用户手动释放锁,锁住的代码执行完后系统会自动让线程释放对锁的占用 不可中断,除非抛异常或者正常运行完成 非公平锁 只能随机唤醒一个或者唤醒所以线程
ReentrantLock 是JUC中的一个类,是API层面的锁 需要手动释放锁,若没释放,则可能导致死锁 可中断 默认非公平,可设置为公平锁 可以精确唤醒


这里来说一说ReentrantLock的精确唤醒。现有题目如下:

有A、B、C三个线程,A打印5次,B打印10次,C打印15次,然后又是A打印5次,B打印10次,C打印15次……循环10轮。


这就是经典的线程按序交替问题。看看如何使用 ReentrantLock 来解决。

/** 资源类 */
class Resource{
    private   Integer flag = 1; // 1: A执行,2:B执行,3:C执行
    private  Lock lock = new ReentrantLock();
    public Condition condition1 = lock.newCondition();
    public Condition condition2 = lock.newCondition();
    public  Condition condition3 = lock.newCondition();
    public void print(Condition waitCondition, Condition signalCondition, 
                      Integer num, Integer nowFlag, Integer changedFlag){
        lock.lock();
        try { // 1. 判断
            while (this.flag != nowFlag){
                waitCondition.await();
            }
            // 2. 干活
            for (int i=1; i<=num; i++){
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            // 3. 通知
            this.flag = changedFlag;
            signalCondition.signal();
        }catch (Exception e){
        }finally {
            lock.unlock();
        }
    }
}


这是资源类,一把锁,因为有三个线程,需要精确唤醒,就需要三个condition;flag是一个标识,用来判断是哪个线程进行执行;print方法有5个参数,第一个是等待的线程的condition,第二个是需要唤醒的线程的condition,第三个是打印的次数,第四个是当前的flag,第五个是需要修改的flag值。看看线程如何操作这个资源类:

public static void main(String[] args){
        Resource resource = new Resource();
        new Thread(() -> {
            for (int i=1; i<=10; i++){
                resource.print(resource.condition1, resource.condition2, 5, 1, 2);
                System.out.println("========================================================");
               }
           },"A").start();
        new Thread(() -> {
            for (int i=1; i<=10; i++){
                resource.print(resource.condition2, resource.condition3, 10, 2, 3);
                System.out.println("========================================================");
            }
        },"B").start();
        new Thread(() -> {
            for (int i=1; i<=10; i++){
                resource.print(resource.condition3, resource.condition1, 15, 3, 1);
                System.out.println("========================================================");
            }
        },"C").start();
}


首先创建A线程,看看A线程调用print方法传入参数后是什么情况:

lock.lock();
try { // 1. 判断
    while (this.flag != 1){
        condition1.await();
    }
    // 2. 干活
    for (int i=1; i<=5; i++){
        System.out.println(Thread.currentThread().getName() + "\t" + i);
    }
    // 3. 通知
    this.flag = 2;
    condition2.signal();
}catch (Exception e){ 
}finally {
    lock.unlock();
}


线程A调用时,首先判断flag是不是1,如果不是,那么线程A就等待,否则就干活,打印5次。干完活要让线程B执行,所以将flag修改为2,然后将线程B唤醒。线程B调用时,自己干完活就唤醒C,线程C干完活就唤醒A……所以执行结果就是:


image.png


  • 阻塞队列版生产消费模式:


资源类:

class Resource {
    private volatile boolean flag = true; // 标识
    private AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue<Integer> blockingQueue = null;
    public Resource(BlockingQueue<Integer> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }
    // 生产的方法
   public void produce() throws InterruptedException {
        boolean result;
        Integer data = null;
        while (flag){
            data = num.incrementAndGet();
            queue.put(data);
            System.out.println("【阻塞队列版生产消费模式】生产成功!");
        }
    }
    // 消费的方法
    public void consume() throws InterruptedException {
        Integer data = null;
        while (flag){
            data = queue.take();
            System.out.println("【阻塞队列版生产消费模式】消费成功!");
        }
    }
    // 停止的方法
    public void stop() throws Exception {
        this.flag = false;
    }
}


首先,当flag为true时,进行生产,就是将 atomicInteger 进行自增,放到阻塞队列中;放完一个就休息1秒钟。然后是消费的方法,也是当flag为true就进行消费,消费就是从阻塞队列中取出元素,如果取到的是 null,说明队列中没有元素了,就将flaf设为false,退出循环,停止消费。 最后的停止的方法,就是将flag设为false,这样生产和消费都会停止。


操纵资源类:

public static void main(String[] args) throws Exception{
        Resource resource = new Resource(new ArrayBlockingQueue<>(6));
        new Thread(() -> {
            System.out.println("生产线程启动!");
            try {
                resource.produce();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "生产线程 ").start();
        new Thread(() -> {
            System.out.println("消费线程启动!");
            try {
                resource.consume();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "消费线程 ").start();
        // 5秒钟后停止
        TimeUnit.SECONDS.sleep(5);
        System.out.println();
        System.out.println("5秒钟后停止生产!");
        resource.stop();
}


这里是创建了两个线程,一个调用生产方法,一个调用消费方法;然后让主线程睡5秒调用停止的方法。看看运行结果:


image.png


这就是阻塞队列版的生产消费模式,不用我们去控制线程的通信。



相关文章
|
5月前
|
算法
LinkedBlockingQueue
LinkedBlockingQueue
45 0
|
算法 安全 Java
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
|
8月前
|
存储 安全 Java
实现一个阻塞队列
实现一个阻塞队列
52 0
|
8月前
|
存储 消息中间件 安全
关于阻塞队列
关于阻塞队列
61 0
阻塞队列BlockingQueue
阻塞队列BlockingQueue
59 0
阻塞队列BlockingQueue
|
存储 缓存 安全
BlockingQueue阻塞队列原理以及实现
BlockingQueue阻塞队列原理以及实现
134 0
|
存储 缓存 安全
JUC之阻塞队列解读(BlockingQueue)
JUC之阻塞队列解读(BlockingQueue)
|
消息中间件 前端开发 中间件
阻塞队列的理解
阻塞队列的理解
|
算法
BlockingQueue二
接着上篇BlockingQueue没讲完的 LinkedTransferQueue LinkedTransferQueue是一个由链表结构组成的无界阻塞队列,相对于其它阻塞队列,LinkedBlockingQueue可以算是LinkedBlockingQueue与SynhronoousQueue结合,LinkedtransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部结构分为数据节点、请求节点,基于CAS无锁算法实现
134 0
BlockingQueue二
|
缓存 安全 Java
JUC系列学习(四):线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue
线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue
121 0

热门文章

最新文章

下一篇
开通oss服务