谈到ArrayBlockingQueue的特色就是循环队列,然后一把锁,2个条件,完成了功能。本来以为LinkedBlockingQueue也是这样的,结果和预期不一样,LinkedBlockingQueue利用了链表的特点,使用了两把锁,两个条件来控制。是一个锁分离的应用,下面就说说,他的实现,以及为什么ArrayBlockingQueue就不适合锁分离。
主要成员变量
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final AtomicInteger count = new AtomicInteger();
除了两个锁,两个条件外,我这里专门列举了计数器。这个计数器很重要,重要到锁分离要依赖他才能正常运行。
锁分离
使用双锁分离就得注意一点,那就是防止线程夯死。生产线程要唤醒生产线程,消费线程也要唤醒生产线程,消费线程唤醒消费线程,消费线程也要唤醒生产线程。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
//唤醒标记
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//阻塞生产线程
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//唤醒生产线程
notFull.signal();
} finally {
putLock.unlock();
}
//唤醒消费线程
if (c == 0)
signalNotEmpty();
}
生产线程唤醒消费线程
基于上面的介绍,我们来看代码,唤醒标记就是为了生产唤醒消费的,因为可能出现消费线程全部都已经等待了,此时生产线程运作,但是消费线程并不能自己唤醒自己,于是就有了signalNotEmpty()的操作。这里的c是getAndIncrement的值,就是获取计数之前的值。c==0的满足条件就有1个元素,在这种情况下才去唤醒消费线程。
生产线程唤醒生产线程
在获取锁后,如果发现容量达到上限,就阻塞了,等待被唤醒,如果可以加入,就执行enqueue方法,是个很简单的链表添加节点的方法。就是在原来last节点后加节点,然后更新last节点。
private void enqueue(Node<E> node) {
last = last.next = node;
}
在计数器自增后,判断唤醒标记,如果还能继续生产,就去唤醒生产线程。
消费的方案思想和生产类似,这里就不说代码了。
删除
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
删除代码相对比较简单,主要是要获取两把锁,才能进行删除操作就是fullyLock()和fullyUnlock(),删除掉元素后,还要唤醒生产线程。
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
ArrayBlockingQueue为何不适合锁分离
这个主要是循环队列的原因,主要是数组和链表不同,链表队列的添加和头部的删除,都是只和一个节点相关,添加只往后加就可以,删除只从头部去掉就好。为了防止head和tail相互影响出现问题,这里就需要原子性的计数器,头部要移除,首先得看计数器是否大于0,每个添加操作,都是先加入队列,然后计数器加1,这样保证了,队列在移除的时候,长度是大于等于计数器的,通过原子性的计数器,双锁才能互不干扰。数组的一个问题就是位置的选择没有办法原子化,因为位置会循环,走到最后一个位置后就返回到第一个位置,这样的操作无法原子化,所以只能是加锁来解决。
适用场景
LinkedBlockingQueue的优点是锁分离,那就很适合生产和消费频率差不多的场景,这样生产和消费互不干涉的执行,能达到不错的效率,尽量不使用remove操作,获取两把锁的效率更低,可以使用size方法(就是计数器直接返回),这个还是比较重要的,有些集合不适合使用size,例如ConcurrentLinkedQueue,正确应该使用isEmpty()。