LinkedBlockingQueue源码解读

简介: LinkedBlockingQueue源码解读

Node LinkedBlockingQueue链表节点,单向节点

//Node节点类,单向节点
  static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         继任节点
         * - this Node, meaning the successor is head.next         
         * - null, meaning there is no successor (this is the last node)
            next为空意味当前节点为链尾
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

构造方法

//默认无界队列
 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

LinkedBlockingQueue指定容量,链头和链尾都非空对象但item为空

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

全局变量

 /** The capacity bound, or Integer.MAX_VALUE if none */
    //链表容量,默认Integer.MAX_VALUE,即无界队列
    private final int capacity;

    /** Current number of elements */
    //当前队列元素总量
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list. 链头
     * Invariant: head.item == null //不变形:链头元素永为空
     */
    transient Node<E> head;

    /**
     * Tail of linked list. 链尾
     * Invariant: last.next == null //不变形:链尾元素后再无元素
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    //拿锁,在 take, poll等方法时会请求
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    //队列非空条件,以便通知队列进行取元素
    private final Condition notEmpty = takeLock.newCondition();

    
    /** Lock held by put, offer, etc */
    //插入锁,在  put, offer等方法时会请求
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    //队列非空条件,以便同意队列进行插入元素
    private final Condition notFull = putLock.newCondition();

signalNotEmpty链表非空,然后signal(通知) takeLock进行获取元素

//仅在put/offer后调用
 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//强制拿锁
        try {
            notEmpty.signal();//触发signal
        } finally {
            takeLock.unlock();
        }
    }

signalNotFull链表非满,然后signal(通知) putLock进行插入元素

//仅在take/poll后调用
 private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

enqueue插入元素

 private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        //putLock必须获取当前锁
        // assert last.next == null;
        //队列尾必须为空
        //新node指向原队列尾元素的next(链下个对象),然后再指向last(链尾)
        last = last.next = node;
    }

dequeue取元素

 private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        //当前线程必须持有takeLock
        // assert head.item == null;
        //当前链头元素必须为空
        //备份链头
        Node<E> h = head;
        //备份链次元素(实际要取出的元素,定义为first)
        Node<E> first = h.next;
        h.next = h; // help GC //原链头h已经无作用
        head = first;//first指向head,first成为新链头
        E x = first.item;//取出目标元素
        first.item = null;//置空(LinkBlockingQueue规范)
        return x;
    }

fullyLock&fullyUnlock全局拿锁和放锁

  void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

Collection插入到队列

 public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);//默认无界队列
        //获取pulLock,构造函数中pulLock从无竞争,但需要保证可见性        
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                //不支持空对象
                if (e == null)
                    throw new NullPointerException();
                //不允许超过限度    
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);//原子更新
        } finally {
            putLock.unlock();
        }
    }

put放入对象

 public void put(E e) throws InterruptedException {
        //不支持空对象
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        //预创建操作标记(-1即无操作)
        int c = -1;
        //创建节点
        Node<E> node = new Node<E>(e);
        //备份(副本)putLock和当前总量
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //请求putLock锁(可打断)
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
             //容量已满则等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //执行过插入至链尾
            enqueue(node);
            //原子自增一位,返回旧值
            c = count.getAndIncrement();
            //链表还没有满,通知其他线程执行插入
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
    }

offer带时间限制的插入,返回操作结果

  public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        //严禁非空对象
        if (e == null) throw new NullPointerException();
        //获取等待时间
        long nanos = unit.toNanos(timeout);
        //预创建操作标记(-1即无操作)
        int c = -1;
        //获取putLock锁和当前链表容量
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //可打断的请求锁
        putLock.lockInterruptibly();
        try {
            //如果已满载,并且
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                //等待nanos(毫秒)秒,期间收到signal则返回(nanos-等待时间),否则在等待结束后返回0或负数
                //可打断并返回InterruptedException
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));//插入元素
            c = count.getAndIncrement();//获取最新容量高
            if (c + 1 < capacity)//未满载则通知其他线程进行put/offer
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费
        if (c == 0)//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
            signalNotEmpty();
        return true;
    }

offer尝试插入(一旦尝试插入则一直等待直至成功)

   public boolean offer(E e) {
        //元素不能为空
        if (e == null) throw new NullPointerException();
        //当前链表容量
        final AtomicInteger count = this.count;
        //满的话则返回false
        if (count.get() == capacity)
            return false;
        //预创建操作标记(-1即无操作)
        int c = -1;
        Node<E> node = new Node<E>(e);
        //获取putlock
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //未满载
            if (count.get() < capacity) {
                enqueue(node);//插入元素
                c = count.getAndIncrement();
                //队列未满则继续通知插入
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)//存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费
            signalNotEmpty();//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
        return c >= 0;//链表有元素则表示插入成功
    }

take取元素(可打断)

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //如果当前链表空,则等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            //取出首元素(first) E
            x = dequeue();
            c = count.getAndDecrement();//链表容量(原子)减一,并返回旧值
            //链表非空则继续通知其他线程来取
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();//放锁
        }
        //如果链满则通知其他putLock等待的线程进行取元素
        //意思是,takeLock和putLock同时进行时,putLock一直在放元素,true表示有一条线程在等待插入元素
        if (c == capacity)
            signalNotFull();
        return x;
    }

poll取元素,有时间

   public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;//定义待取得元素
        int c = -1;//操作标记
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可打断请求
        try {
            //如果链表无元素,则执行等待,直至耗时完毕
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();//取元素
            c = count.getAndDecrement();//获取原来链表长度然后长度减一
            if (c > 1)//如果链长度大于1,证明还有链还有元素,通知其他等待的takeLock执行取元素
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)//原链长度达至最大长度,即在取完元素后还可以放一个元素,所以执行通知putLock进行放元素
            signalNotFull();
        return x;
    }

poll取元素

  public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)//当前链无元素,直接返回
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//获取锁,否则一直等待
        try {
            if (count.get() > 0) {//获取锁成功,并链有元素
                x = dequeue();//取元素
                c = count.getAndDecrement();//获取原链表长度然后实际长度减一
                if (c > 1)//原链表长度还有元素则通知继续进行通知其他线程取元素
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //原链表长度达至满,则表示刚取完还可以放一个元素,所以执行通知
        if (c == capacity)
            signalNotFull();
        return x;
    }

peek取元素,但不拆链

  public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

unlink 拆链,将trail的下个元素p从链中拆除

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();//必须获takeLock和putLock,合称fullyLock();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        
        p.item = null;//p元素item置null
        trail.next = p.next;//trail和(p.next)建立链关系
        //如果原p就是尾元素,则置trail为尾元素
        if (last == p)
            last = trail;
        //获取原链长度并减一,原链长度等于限额则表示链未满,则通知进行插入
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

remove拆除item所在的链

   public boolean remove(Object o) {
        //LinkedBlockingQueue允许null的item,除了head和last
        if (o == null) return false;
        fullyLock();//全局锁
        try {
            //迭代链表,发现首元素则拆除链
            //
            for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

contains o是否存在链表中

   public boolean contains(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            //p为空则表示到达链尾,否则原本就是空链
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }

toArray迭代元素返回数组

 public Object[] toArray() {
        fullyLock();//获取全局锁
        try {
            int size = count.get();
            Object[] a = new Object[size];
            int k = 0;
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = p.item;
            return a;
        } finally {
            fullyUnlock();
        }
    }

toArray迭代元素,然后插入数组a

 public <T> T[] toArray(T[] a) {
        fullyLock();//获取全锁
        try {
            int size = count.get();//当前链表长度
            if (a.length < size)//参数数组a长度小于当前链表长度,则进行扩容
                a = (T[])java.lang.reflect.Array.newInstance
                    (a.getClass().getComponentType(), size);

            int k = 0;//迭代下标
            //迭代链表并且将item存入数组a
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = (T)p.item;
            //如果参数数组a长度长于链长度,则a下标的元素置空,但(k+1)往后的元素呢?又不置空?
            if (a.length > k)
                a[k] = null;
            return a;
        } finally {
            fullyUnlock();
        }
    }

toString

public String toString() {
        fullyLock();//全局锁
        try {
            Node<E> p = head.next;//获取首元素,为空则直接返回[]
            if (p == null)
                return "[]";

            StringBuilder sb = new StringBuilder();
            sb.append('[');
            for (;;) {
                E e = p.item;//获取元素,然后组装,如果item为当前链表则返回this Collection
                sb.append(e == this ? "(this Collection)" : e);
                p = p.next;//为空则到达链尾
                if (p == null)
                    return sb.append(']').toString();
                sb.append(',').append(' ');
            }
        } finally {
            fullyUnlock();
        }
    }

clear清空链

 public void clear() {
        fullyLock();//全局锁
        try {
            //1,取出实际头元素p,备份原head至h               4,将p定于为新head     
            for (Node<E> p, h = head; (p = h.next) != null; h = p) {
                h.next = h;//2,原head的next指向原head,造成循坏链,并item为空
                p.item = null;//3,实际头元素item置空,help gc
            }
            head = last;
            // assert head.item == null && head.next == null;
            if (count.getAndSet(0) == capacity)
                notFull.signal();
        } finally {
            fullyUnlock();
        }
    }

drainTo转换item至Collection

public int drainTo(Collection<? super E> c, int maxElements) {
        //集合不能为空并不能为当前元素
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;//是否未满,以通知来插入
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//请求tackLock
        try {
            //maxElements和count,取小的一方
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                //迭代链
                while (i < n) {
                    //取出节点head次元素
                    Node<E> p = h.next;
                    //添加至collections
                    c.add(p.item);
                    //然后置空
                    p.item = null;
                    //原head自关联
                    h.next = h;
                    //定义新head
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                //恢复链原有状态,即使c.add()抛异常
                if (i > 0) {//如果已经迭代过元素将迭代中的h作为新head
                    //assert h.item == null;
                    head = h;
                    //如果迭代完链的原长度的为capacity(链容量最大值)则表示此链没有满,通知putlock进行插入
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)
                signalNotFull();
        }
    }

Itr迭代器

    private class Itr implements Iterator<E> {
        /*
         * Basic weakly-consistent iterator.  At all times hold the next
         * item to hand out so that if hasNext() reports true, we will
         * still have it to return even if lost race with a take etc.
         */

        private Node<E> current;
        private Node<E> lastRet;
        private E currentElement;
        //构造函数
        Itr() {
            fullyLock();
            try {//下个元素
                current = head.next;
                //下个元素item
                if (current != null)
                    currentElement = current.item;
            } finally {
                fullyUnlock();
            }
        }

        //下个月元素是否为空
        public boolean hasNext() {
            return current != null;
        }

        /**
         * Returns the next live successor of p, or null if no such.
         *
         * Unlike other traversal methods, iterators need to handle both:
         * - dequeued nodes (p.next == p)
         * - (possibly multiple) interior removed nodes (p.item == null)
         */
        private Node<E> nextNode(Node<E> p) {
            for (;;) {
                Node<E> s = p.next;
                //如果自连链,则证明链头(clear()和drainTo()回导致此情况)
                if (s == p)
                    return head.next;
                //否则返回p.next(s != null && s.item == null仅在链头时发生)    
                if (s == null || s.item != null)
                    return s;
                p = s;
            }
        }

        public E next() {
            fullyLock();
            try {
                //hasNext已经判断了不为空
                if (current == null)
                    throw new NoSuchElementException();
                E x = currentElement;
                lastRet = current;
                current = nextNode(current);
                //current == null表示已经达到链尾
                currentElement = (current == null) ? null : current.item;
                return x;
            } finally {
                fullyUnlock();
            }
        }

        public void remove() {
            //hasNext已经判断了不为空
            if (lastRet == null)
                throw new IllegalStateException();
            fullyLock();
            try {
                Node<E> node = lastRet;
                lastRet = null;
                for (Node<E> trail = head, p = trail.next;
                     p != null;
                     trail = p, p = p.next) {
                     //迭代链,找到p然后拆链
                    if (p == node) {
                        unlink(p, trail);
                        break;
                    }
                }
            } finally {
                fullyUnlock();
            }
        }
    }
目录
相关文章
|
1月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue
`ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要: - **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。 - **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。 - **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。
32 0
|
8月前
|
存储 安全 Java
【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别
【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别
|
1月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于: 1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。 2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。 总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;
186 1
|
1月前
|
存储 缓存 Java
Java线程池ThreadPoolExcutor源码解读详解06-阻塞队列之SynchronousQueue
SynchronousQueue 是 Java 中的一个特殊阻塞队列,它没有容量,实现线程间的直接对象交换。这个队列的特点和优缺点如下: 1. **无容量限制**:SynchronousQueue 不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。这意味着生产者和消费者必须严格同步。 2. **阻塞性质**:当一个线程试图插入元素时,如果没有线程正在等待获取,那么插入操作会阻塞;同样,尝试获取元素的线程如果没有元素可取,也会被阻塞。 3. **公平与非公平策略**:SynchronousQueue 支持公平和非公平的线程调度策略。公平模式下,等待时间最长的线程优先
44 5
|
1月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue
`LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。 `xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹
257 5
|
1月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解08-阻塞队列之LinkedBlockingDeque
**摘要:** 本文分析了Java中的LinkedBlockingDeque,它是一个基于链表实现的双端阻塞队列,具有并发安全性。LinkedBlockingDeque可以作为有界队列使用,容量由构造函数指定,默认为Integer.MAX_VALUE。队列操作包括在头部和尾部的插入与删除,这些操作由锁和Condition来保证线程安全。例如,`linkFirst()`和`linkLast()`用于在队首和队尾插入元素,而`unlinkFirst()`和`unlinkLast()`则用于删除首尾元素。队列的插入和删除方法根据队列是否满或空,可能会阻塞或唤醒等待的线程,这些操作通过`notFul
262 5
|
1月前
|
缓存 安全 Java
Java并发基础:SynchronousQueue全面解析!
SynchronousQueue的优点在于其直接性和高效性,它实现了线程间的即时数据交换,无需中间缓存,确保了数据传输的实时性和准确性,同时,其灵活的阻塞机制使得线程同步变得简单而直观,适用于需要精确协调的生产者-消费者模型。
Java并发基础:SynchronousQueue全面解析!
|
1月前
|
存储 监控 安全
Java并发基础:LinkedBlockingQueue全面解析!
LinkedBlockingQueue类是以链表结构实现高效线程安全队列,具有出色的并发性能、灵活的阻塞与非阻塞操作,以及适用于生产者和消费者模式的能力,此外,LinkedBlockingQueue还具有高度的可伸缩性,能够在多线程环境中有效管理数据共享,是提升程序并发性能和稳定性的关键组件。
Java并发基础:LinkedBlockingQueue全面解析!
|
11月前
|
存储 安全 Java
LinkedBlockingQueue 原理
LinkedBlockingQueue 原理
|
缓存 安全 Java
JUC系列学习(四):线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue
线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue

热门文章

最新文章