关于阻塞队列

简介: 关于阻塞队列

1 概念和数据结构

支持以下操作的队列:

  • 在检索元素时等待队列变为非空
  • 在存储元素时等待队列中的空间变为可用。

BlockingQueue方法有四种形式,具有不同的处理操作的方法,这些操作不能立即得到满足,但可能在将来的某个时候得到满足:

  • 第一个抛出异常,
  • 第二个返回一个特殊值(null或false,取决于操作),
  • 第三个阻塞当前线程,直到操作成功,
  • 第四个阻塞只有给定的最大时间限制,然后放弃。

这些方法总结如下表: BlockingQueue方法的总结

接下来是BlockingQueue的继承关系图:

BlockingQueue的七大实现类:

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
  • DelayedWorkQueue
  • ForwardingBlockingQueue
  • SynchronousQueue
  • DelayQueue
  • LinkedBlockingQueue:由链表结构组成的有界(大小默认为Integer.MAX_VALUE)的阻塞队列。
  • PriorityBlockingQueue

    阻塞队列的基本原型:

2 应用场景

  • 消息队列/消息中间件
  • 生产-消费者问题

3 代码

3.1 常用阻塞队列
3.1.1 ArrayBlockingQueue

由数组结构组成的有界阻塞队列。

/**
 * ArrayBlockingQueueTest
 */
public static void ArrayBlockingQueueTest() {
    ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
    try {
        //向队列中添加元素add()方法会抛出异常
        blockingQueue.add("zs");
        blockingQueue.add("ls");
        blockingQueue.add("ww");
        blockingQueue.add("zl");
    } catch (Exception e) {
        //发生异常时超出部分会添加失败
        System.err.println("err -> " + e.getMessage());
    }
    //检查栈顶元素
    System.out.println("栈顶元素 -> " + blockingQueue.element());
    try {
        //从队头排出元素
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
    } catch (Exception e) {
        System.err.println("err -> " + e.getMessage());
    }
    //其他API与Queue使用类似
}
3.1.2 LinkedBlockingQueue

由链表结构组成的有界(大小默认为Integer.MAX_VALUE)的阻塞队列。

/**
 * LinkedBlockingQueueTest
 */
public static void LinkedBlockingQueueTest() {
    LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
    blockingQueue.add("zs");
    System.out.println(blockingQueue.remove());
}
3.1.3 SynchronousQueue

不存储元素的阻塞队列,即单个元素的队列,每个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

/**
 * SynchronousQueueTest
 */
public static void SynchronousQueueTest() throws InterruptedException {
    SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
    //必须要使用两个线程,一个put一个take
    //以下情况会造成线程阻塞
    //原因:均在主线程中
    /**
     * synchronousQueue.put("zs");
     * synchronousQueue.take();
     */
    //正常使用
    new Thread(() -> {
        try {
            synchronousQueue.put("zs");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        try {
            System.out.println(synchronousQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
3.2 使用阻塞队列解决生产-消费者问题
/**
 * @desc: 使用阻塞队列完成生产-消费者模型
 * @author: YanMingXin
 * @create: 2021/8/1-16:00
 **/
public class Test02 {
    /**
     * 声明一个队列
     */
    SynchronousQueue queue = new SynchronousQueue<Integer>();
    /**
     * 生产次数
     */
    public static volatile int productNum = 0;
    /**
     * 消费次数
     */
    public static volatile int consumeNum = 0;
    /**
     * 生产者
     */
    public void producer(Integer val) throws Exception {
        queue.put(val);
        System.out.println("生产 -> " + val);
    }
    /**
     * 消费者
     */
    public void consumer() throws Exception {
        System.out.println("消费 -> " + queue.take());
    }
    /**
     * 测试
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(2);
        Test02 test = new Test02();
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(100);
                    test.producer(i);
                    productNum += 1;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            try {
                latch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(100);
                    test.consumer();
                    consumeNum += 1;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            try {
                latch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        latch.await();
        System.out.println("生产次数 = " + productNum);
        System.out.println("消费次数 = " + consumeNum);
    }
}

4 底层原理

在这里我们拿ArrayBlockingQueue举例:

ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列,可以按照 FIFO原则对元素进行排序。

  • ArrayBlockingQueue是基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。 并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
  • ArrayBlockingQueue内部通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是在创建创建ArrayBlockingQueue时候指定的。
  • ArrayBlockingQueue和ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象。ReentrantLock是可重入的互斥锁。ArrayBlockingQueue就是根据ReentrantLock互斥锁实现"多线程对共享资源的访问"。ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。
  • ArrayBlockingQueue和Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。使用通知模式实现:所谓通知模式,当生产者往满的队列里面添加元素的时候,会阻塞生产者(调用Condition notFull.await()进行等待);当消费者消费了一个队列中的元素后,会通知(调用Condition notFull.signal()唤醒生产者)生产者当前队列可用。反之,当消费者消费的时候,发现队列是空的,则消费者会被阻塞(通过Condition的 notEmpty.await()进行等待),当生产者插入了队列中的一个元素后,则会调用notEmpty.signal()唤醒消费者继续消费。
/**
     * 构造方法
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //声明一个ReentrantLock
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
   /**
    * put方法
    */
    public void put(E e) throws InterruptedException {
        //检查不为空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            //插入元素
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
   /**
    * take方法
    */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            //提取元素当前的取位、前进和信号,仅在持有锁时调用。
            return dequeue();
        } finally {
            lock.unlock();
        }
    }


相关文章
|
存储 安全 Java
阻塞队列《——》特殊的队列(先进先出)
阻塞队列《——》特殊的队列(先进先出)
46 0
|
算法 安全 Java
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
|
7月前
|
存储 安全 Java
实现一个阻塞队列
实现一个阻塞队列
50 0
|
消息中间件 缓存 Java
多线程应用——阻塞队列
用数组实现阻塞队列
67 0
阻塞队列BlockingQueue
阻塞队列BlockingQueue
57 0
阻塞队列BlockingQueue
|
存储 消息中间件 算法
JUC-阻塞队列
问题引出 一.单端阻塞队列(BlockingQueue) 二.双端阻塞队列(BlockingDeque) 三.延迟队列(DelayQueue)
52 0
|
存储 缓存 安全
BlockingQueue阻塞队列原理以及实现
BlockingQueue阻塞队列原理以及实现
131 0
|
存储 缓存 安全
JUC之阻塞队列解读(BlockingQueue)
JUC之阻塞队列解读(BlockingQueue)
|
消息中间件 前端开发 中间件
阻塞队列的理解
阻塞队列的理解
|
存储 消息中间件 安全
关于阻塞队列
关于阻塞队列