一,BlockingQueue
在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,因此可以好好的深入了解一下这个BlockingQueue阻塞队列。
用一句话描述这个阻塞队列就是:它是线程的一个通信工具,在任意时刻,不管并发有多高,在单jvm进程上,同一时间永远只有一个线程能够对队列进行入队和出队的操作,它的特性是在任意时刻只有一个线程可以进行take或者put操作。因此这个队列是一个线程安全的队列。
比较适用于生产者和消费者的场景,因此适用的应用场景如下
线程池,springCloud-Eureka的三级缓存,Nacos,Netty,RakectMq等
所有的阻塞队列都都实现了对这个BlockingQueue接口
public interface BlockingQueue<E> extends Queue<E>
1,主要常用的队列有如下
ArrayBlockingQueue: 由数组支持的有界队列
LinkedBlockingQueue: 由链接节点支持的可选有界队列
PriorityBlockingQueue: 由优先级堆支持的无界优先级队列
DelayQueue: 由优先级堆支持的、基于时间的调度队列
2,基本工作原理实现如下
1,以一个有界队列为例,首先消费者这边获取到锁,然后会生产商品,然后会往队列中填满数据,队列填满之后,生产者端会进行阻塞,同时会释放这把锁,并且会通知这个消费者赶紧去消费。当然内部也做了很多事情,不一定就是说一定要阻塞队列满了之后才会去唤醒生产者去消费,而是消费者那边也会有一个监听事件,只有队列不为空,就会有这个消费者来消费。
2,消费者在接收到生产者的通知之后呢,就会先去获取到这把锁,然后对里面的产品进行消费,当队列里面的产品都被消费完成之后,消费者这边又会释放这把锁,然后将自身阻塞,并同时去唤醒这个生产者继续生产产品。
3,生产者又获取到锁,然后重复执行第一步。
3,基本api使用如下
二,源码剖析
在了解过一定的工作原理之后,接下来可以对源码分析一波。
2.1,ArrayBlockingQueue
这里主要通过这个ArrayBlockingQueue为例,来描述一下这个阻塞队列的工作流程
这个构造方法里面有如下参数
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //非公平锁 notEmpty = lock.newCondition(); //条件对象,用于唤醒指定线程 notFull = lock.newCondition(); //条件对象 }
生产者会向队列中put产品,生产者后会持有锁,此时会向队列中存放产品,如果队列满了,则会阻塞自己,并且在最后会释放锁。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //生产者加锁 lock.lockInterruptibly(); try { while (count == items.length) //如果队列满了,则会阻塞 notFull.await(); enqueue(e); } finally { lock.unlock(); //释放锁 } }
既然涉及到ReentrantLock,那么就用从之前的AQS里面讲起了,这里面这要是一个CLH同步等待队列,由一个双向链表和一个同步阻塞器组成,同步阻塞器会有一个state和一个exclusiveOwnerThread状态组成,state=0表示当前没有对象获取到锁,可以来竞争锁。每个结点由一个前驱指针和一个后继指针,并且里面有一个waitStatus等待状态,该状态主要表示下一个结点的存活状态。
这里的话不会像之前一样使用这个CLH同步等待队列,而是加入了一种新的Condition条件等待队列,如下图。由firstWaiter和nextWaiter组成的单向链表队列,里面的waitStatus为CONDITION:-2 。也就是说如果当前生产者结点后面的结点又是一个生产者节点,因为期间可能存在多个生产者的线程,而为了唤醒接下来的消费者,就会创建一个条件等待队列,去存储后面的生产者结点。
就是说在CLH同步等待队列中,当前结点为生产者的话,在阻塞队列满了之后,如果CLH中的下一个节点还是生产者,则会将waitStatus的状态设置成-2,并将下一个节点移动到这个条件等待队列里面并进行排队,如果下一个结点还是,又会将下一个结点移动到这个条件等待队列里面并进行排队。知道下一个结点是消费者为止。
await()释放锁的流程如下
public final void await() throws InterruptedException { //线程是否被中断,如果被中断,直接抛异常 if (Thread.interrupted()) throw new InterruptedException(); //条件等待队列,会构建一个新的队列 Node node = addConditionWaiter(); //释放锁,并对对应的结点进行唤醒操作 int savedState = fullyRelease(node); int interruptMode = 0; //判断当前结点是在条件队列里面还是在同步队列里面 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
构建条件等待队列如下
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
出队,消费者在获取产品时,产品就会出队,与此同时,在队列出队成功之后,队列中就会有一个空位,会调用notFull.signal()方法,通知生产者可以去生产产品了。并将这个条件等待队列放回这个CLH队列里面,只有在CLH队列里面才会获取锁。最后在CLH中才能进行unPark释放锁的操作。
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //队列中有空位,通知生产者生产产品 notFull.signal(); return x; }
消费者获取产品
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
三,总结
BlockingQueue也是基于这个AQS的方式实现的,主要是利用这个生产者和消费者这个模型来实现。通过这个AQS中的CLH同步队列来对节点的锁的阻塞和释放,期间利用了这个条件等待队列来实现,如果存在多个生产者的线程的情况下,就会将这些线程加入到一个条件等待的队列里面。并将这个节点的状态改为-2,condition状态。在全部进入条件等待队列之后,这个锁还在并没有释放,因此最后又需要将这个条件等待队列里面的结点加回到CLH同步队列中,再进行排队的释放这个锁。结点出队的时候,然后生产者会通过一个singal监听这个消费者,每当这个阻塞队列里面出队,有一个位置的的时候,生产者就会生产这个产品。消费者也会监听这个队列,队列中只要不为空,就回去消费队列中的产品。
获取锁的条件
只有在CLH队列里等待的Node结点并且前驱结点的 waitStatus 为sinal = -1的可被唤醒的结点。
条件队列里面的这些节点是不能获取到锁的。