import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class LinkedBlockingQueueTest<E> { //添加数据时的锁 private final ReentrantLock putLock = new ReentrantLock(); //获取数据时的锁 private final ReentrantLock getLock = new ReentrantLock(); //容器容量 private final int capacity; //当前元素总数量 private final AtomicInteger currentElementCount = new AtomicInteger(); //一旦变量被transient修饰,变量将不再是对象持久化的一部分,该变量内容在序列化后无法获得访问。 //尾巴节点 private transient Node<E> last; //头节点 transient Node<E> head; /** * Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作, * 相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。 */ //等待拿数据的等待队列 private Condition notNull = getLock.newCondition(); //等待添加数据的等待队列 private Condition notFull = putLock.newCondition(); public void put(E element) throws InterruptedException{ if (element == null) throw new InterruptedException(); int count = -1; Node<E> currentNeedAddNode = new Node<>(element); final ReentrantLock putLock = this.putLock; final AtomicInteger currentElementCount = this.currentElementCount; //上锁 putLock.lockInterruptibly(); try{ //当前元素数量等于容量大小时线程等待 while (currentElementCount.get() == capacity){ notFull.await(); } enqueue(currentNeedAddNode); count = currentElementCount.incrementAndGet(); if (count + 1 < capacity) //唤醒 notFull.signal(); }finally { //锁释放 putLock.unlock(); } if (count == 0) signalNotEmpty(); } public E get() throws InterruptedException { E getElement; int count = -1; final ReentrantLock getLock = this.getLock; final AtomicInteger currentElementCount = this.currentElementCount; //上锁 getLock.lockInterruptibly(); try { while (currentElementCount.get() == 0){ //等待 notNull.await(); } getElement = dequeue(); count = currentElementCount.getAndDecrement(); if (count > 1) //唤醒 notNull.signal(); }finally { getLock.unlock(); } if (count == capacity) signalNotFull(); return getElement; } //当获取数据时发出等待信号,一般不会阻塞添加数据 private void signalNotEmpty() { final ReentrantLock getLock = this.getLock; getLock.lock(); try { notNull.signal(); }finally { getLock.unlock(); } } //添加数据时的等待信号,一般不会阻塞获取数据 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } //从尾节点插入 private void enqueue(Node<E> element){ last.next = element; last = last.next; } //获取头节点元素并移除 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; } public LinkedBlockingQueueTest() { this(Integer.MAX_VALUE); } public LinkedBlockingQueueTest(int size) { if (size <= 0) throw new IllegalArgumentException(); this.capacity = size; //给头节点尾节点初始化 head = new Node<>(null); last = head; } /** * 链表节点类 * @param <E> */ static class Node<E> { E item; //真正的后续节点,如果head.next = null,表示没有后续节点(这是最后一个节点) Node<E> next; Node(E x) { item = x; } } }