接着上篇BlockingQueue没讲完的
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞队列,相对于其它阻塞队列,LinkedBlockingQueue可以算是LinkedBlockingQueue与SynhronoousQueue结合,LinkedtransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部结构分为数据节点、请求节点,基于CAS无锁算法实现
与前面类似不再赘述
final boolean isData; volatile Object item; volatile Node next; volatile Thread waiter;
其中节点操作过程类似于SynchronousQueue
与SynchronousQueue有区别的是这个可以设置是否阻塞当前线程
NOW=0表示即时操作(可能失败),即不会阻塞调用线程
poll(获取并移除首元素,如果队列为空,直接返回null)
tryTransfer(尝试将元素传递给消费者,如果没有等待的消费者则立即返回false,也不会将元素入队)
ASYNC=1表示异步操作(必然成功)
xfer被操作线程调用时,无论xfer操作过程多久完成,调用者都不会阻塞等待
offer,put,add(插入指定元素到队尾,由于是无界队列,所以会立即返回true)
SYNC=2表示同步操作(阻塞调用线程)
只有xfer操作过程达到了调用线程所期望的结果,调用者才会继续向下执行
PriorityBlockingQueue
优先级队列,里面是数组,但是数组与普通数组不一样,里面的数组维护了一颗堆的二叉树
默认大小为11,但是这个可以扩容
//默认容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //最大容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //存储数据 private transient Object[] queue; //元素个数 private transient int size; //比较 private transient Comparator<? super E> comparator; //锁 private final ReentrantLock lock; //等待 private final Condition notEmpty; private transient volatile int allocationSpinLock;
扩容
如果容量小于64的时候,扩容为原来两倍+2;
如果容量大于64的时候,扩容为原来1.5倍
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); //扩容前先释放锁(扩容可能会费时,先让出锁,让出队线程可以正常操作) Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {//通过CAS操作确保只有一个线程可以扩容 try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) {//大于当前最大容量则可能溢出 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//扩大一个元素也溢出或者超过最大容量则抛出异常 throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE;//扩容后如果超过最大容量,则只扩大到最大容量 } if (newCap > oldCap && queue == array) newArray = new Object[newCap];//根据最新容量初始化一个新数组 } finally { allocationSpinLock = 0; } } if (newArray == null) //如果是空,说明前面CAS失败,有线程在扩容,让出CPU Thread.yield(); lock.lock();//这里重新加锁是确保数组复制操作只有一个线程能进行 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap);//将旧的元素复制到新数组 } }
添加元素
添加元素不会阻塞线程,因为该队列是一个无界队列,因为可以扩容,所以添加元素不会出现阻塞
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
取出元素
取出元素需要判断是否为空,如果为空则需要等待,不然直接返回
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
里面主要堆的上浮与下沉
另一个上浮的方法除了比较器不同以外其它都类似,所以就讲这一个
假设我们构造的是小根堆
private static <T> void siftUpComparable(int k, T x, Object[] array) { // 其中k就是当前放的末尾的位置 Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; //找到其父节点 Object e = array[parent]; if (key.compareTo((T) e) >= 0) //如果当前放入的值大于其父节点则跳出,否则继续 break; // 到这里说明当前放入的值小于其父节点,与父节点交换位置,并且k变为父节点的位置 array[k] = e; k = parent; } array[k] = key; }
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; // 如果右孩子比左孩子小,则弄成右孩子 if (key.compareTo((T) c) <= 0) //如果传入的值小于孩子则退出 break; array[k] = c; k = child; } array[k] = key; } }
LinkedBlockingDeque
与LinkedBlockingQueue类似,只是这个是可以从两端存取,而LinkedBlockingQueue是单链表只能从一边存取,同时LinkedBlockingDeque只有一把锁,如果两把锁的话容易造成下标出错
DelayQueue
其中内部也是由一个PriorityQueue维护一个优先队列
add
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
take
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); //如果队列为空阻塞 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); //如果到期了就返回 first = null; // don't retain ref while waiting if (leader != null) // 没有到期且leader不为空,等待 available.await(); else { //头节点为空,设置当前线程为头节点 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
Leader-Follower线程模型
在Leader-follower线程模型中每个线程有三种模式:
leader:只有一个线程成为leader,如DelayQueue如果有一个线程在等待元素到期,则其他线程就会阻塞等待
follower:会一直尝试争抢leader,抢到leader之后才开始干活
processing:处理中的线程
感谢这位大佬 双子孤狼的博客