BlockingQueue阻塞队列原理以及实现

简介: BlockingQueue阻塞队列原理以及实现

一,BlockingQueue

在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,因此可以好好的深入了解一下这个BlockingQueue阻塞队列。


用一句话描述这个阻塞队列就是:它是线程的一个通信工具,在任意时刻,不管并发有多高,在单jvm进程上,同一时间永远只有一个线程能够对队列进行入队和出队的操作,它的特性是在任意时刻只有一个线程可以进行take或者put操作。因此这个队列是一个线程安全的队列。


比较适用于生产者和消费者的场景,因此适用的应用场景如下

线程池,springCloud-Eureka的三级缓存,Nacos,Netty,RakectMq等


所有的阻塞队列都都实现了对这个BlockingQueue接口

public interface BlockingQueue<E> extends Queue<E>

1,主要常用的队列有如下

ArrayBlockingQueue: 由数组支持的有界队列

LinkedBlockingQueue: 由链接节点支持的可选有界队列

PriorityBlockingQueue: 由优先级堆支持的无界优先级队列

DelayQueue: 由优先级堆支持的、基于时间的调度队列


2,基本工作原理实现如下

1,以一个有界队列为例,首先消费者这边获取到锁,然后会生产商品,然后会往队列中填满数据,队列填满之后,生产者端会进行阻塞,同时会释放这把锁,并且会通知这个消费者赶紧去消费。当然内部也做了很多事情,不一定就是说一定要阻塞队列满了之后才会去唤醒生产者去消费,而是消费者那边也会有一个监听事件,只有队列不为空,就会有这个消费者来消费。

cd738a35d0674176bbd6ca6159f52c3d.png


2,消费者在接收到生产者的通知之后呢,就会先去获取到这把锁,然后对里面的产品进行消费,当队列里面的产品都被消费完成之后,消费者这边又会释放这把锁,然后将自身阻塞,并同时去唤醒这个生产者继续生产产品。

7c30dac756404693870c1e29894f52ee.png

3,生产者又获取到锁,然后重复执行第一步。


3,基本api使用如下

10db982e24694046ad9e64d4f076da25.png

582ad357cd8b49c2a77e915eda7f4a31.png


二,源码剖析

在了解过一定的工作原理之后,接下来可以对源码分析一波。


2.1,ArrayBlockingQueue

这里主要通过这个ArrayBlockingQueue为例,来描述一下这个阻塞队列的工作流程


         

这个构造方法里面有如下参数

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); //非公平锁
    notEmpty = lock.newCondition(); //条件对象,用于唤醒指定线程
    notFull =  lock.newCondition(); //条件对象
}

生产者会向队列中put产品,生产者后会持有锁,此时会向队列中存放产品,如果队列满了,则会阻塞自己,并且在最后会释放锁。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock; //生产者加锁
    lock.lockInterruptibly();
    try {
        while (count == items.length) //如果队列满了,则会阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock(); //释放锁
    }
}

既然涉及到ReentrantLock,那么就用从之前的AQS里面讲起了,这里面这要是一个CLH同步等待队列,由一个双向链表和一个同步阻塞器组成,同步阻塞器会有一个state和一个exclusiveOwnerThread状态组成,state=0表示当前没有对象获取到锁,可以来竞争锁。每个结点由一个前驱指针和一个后继指针,并且里面有一个waitStatus等待状态,该状态主要表示下一个结点的存活状态。

f40ec955fc894a199226af019cbc3f97.png

这里的话不会像之前一样使用这个CLH同步等待队列,而是加入了一种新的Condition条件等待队列,如下图。由firstWaiter和nextWaiter组成的单向链表队列,里面的waitStatus为CONDITION:-2 。也就是说如果当前生产者结点后面的结点又是一个生产者节点,因为期间可能存在多个生产者的线程,而为了唤醒接下来的消费者,就会创建一个条件等待队列,去存储后面的生产者结点。

就是说在CLH同步等待队列中,当前结点为生产者的话,在阻塞队列满了之后,如果CLH中的下一个节点还是生产者,则会将waitStatus的状态设置成-2,并将下一个节点移动到这个条件等待队列里面并进行排队,如果下一个结点还是,又会将下一个结点移动到这个条件等待队列里面并进行排队。知道下一个结点是消费者为止。

ac43693eeadc45539e0ababd576d205d.png

await()释放锁的流程如下

public final void await() throws InterruptedException {
    //线程是否被中断,如果被中断,直接抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //条件等待队列,会构建一个新的队列
    Node node = addConditionWaiter();
    //释放锁,并对对应的结点进行唤醒操作
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断当前结点是在条件队列里面还是在同步队列里面
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

构建条件等待队列如下

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

出队,消费者在获取产品时,产品就会出队,与此同时,在队列出队成功之后,队列中就会有一个空位,会调用notFull.signal()方法,通知生产者可以去生产产品了。并将这个条件等待队列放回这个CLH队列里面,只有在CLH队列里面才会获取锁。最后在CLH中才能进行unPark释放锁的操作。

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //队列中有空位,通知生产者生产产品
    notFull.signal();
    return x;
}

消费者获取产品

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

三,总结

BlockingQueue也是基于这个AQS的方式实现的,主要是利用这个生产者和消费者这个模型来实现。通过这个AQS中的CLH同步队列来对节点的锁的阻塞和释放,期间利用了这个条件等待队列来实现,如果存在多个生产者的线程的情况下,就会将这些线程加入到一个条件等待的队列里面。并将这个节点的状态改为-2,condition状态。在全部进入条件等待队列之后,这个锁还在并没有释放,因此最后又需要将这个条件等待队列里面的结点加回到CLH同步队列中,再进行排队的释放这个锁。结点出队的时候,然后生产者会通过一个singal监听这个消费者,每当这个阻塞队列里面出队,有一个位置的的时候,生产者就会生产这个产品。消费者也会监听这个队列,队列中只要不为空,就回去消费队列中的产品。


获取锁的条件

只有在CLH队列里等待的Node结点并且前驱结点的 waitStatus 为sinal = -1的可被唤醒的结点。

条件队列里面的这些节点是不能获取到锁的。


相关文章
|
存储 安全 Java
ArrayBlockingQueue 和 LinkedBlockingQueue 有什么区别?
ArrayBlockingQueue 和 LinkedBlockingQueue 有什么区别?
|
4月前
ArrayBlockingQueue原理
文章主要介绍了ArrayBlockingQueue的工作原理。ArrayBlockingQueue通过ReentrantLock和Condition实现了高效的阻塞队列,能够有效地避免CPU资源浪费。它非常适合用于生产者-消费者模型的应用场景,特别是需要控制生产者和消费者线程同步的场合。
|
算法 安全 Java
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
|
7月前
并发编程之BlockingQueue(阻塞队列)的详细解析
并发编程之BlockingQueue(阻塞队列)的详细解析
28 0
|
消息中间件
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
57 0
阻塞队列BlockingQueue
阻塞队列BlockingQueue
54 0
阻塞队列BlockingQueue
|
存储 安全 Java
LinkedBlockingQueue 原理
LinkedBlockingQueue 原理
|
存储 缓存 安全
JUC之阻塞队列解读(BlockingQueue)
JUC之阻塞队列解读(BlockingQueue)
|
缓存 安全 Java
JUC系列学习(四):线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue
线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue
117 0
基于数组的阻塞队列 ,ArrayBlockingQueue 原理
基于数组的阻塞队列 ,ArrayBlockingQueue 原理
基于数组的阻塞队列 ,ArrayBlockingQueue 原理