【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue

简介: 简介 上一篇【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue学习了并发队列ConcurrentLinkedQueue,它是一个非阻塞无界队列。本文来学习下JUC中的一个阻塞有界队列-LinkedBlockingQueue。

简介

上一篇【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue学习了并发队列ConcurrentLinkedQueue,它是一个非阻塞无界队列。本文来学习下JUC中的一个阻塞有界队列-LinkedBlockingQueue。

LinkedBlockingQueue


如图继承了AbstractQueue类,实现了BlockingQueue和Serializable接口

LinkedBlockingQueue

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
// 如果没传capacity 则默认使用Integer.MAX_VALUE作为队列大小
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

/**
 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity} is not greater
 *         than zero
 */
 //设置大小为capacity,并设置item为null的head和last辅助节点
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}, initially containing the elements of the
 * given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    //加入队锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        //依次将Collection中的元素加入队列
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        //设置队列大小n
        count.set(n);
    } finally {
        //解锁
        putLock.unlock();
    }
}

可以从构造函数看出,LinkedBlockingQueue是一个有界的队列,队列最大值为capacity,如果初始化时不设置队列大小,则默认大小为Integer.MAX_VALUE

put

将元素加入队列,如果队列满,则一直等待,直到线程被中断或被唤醒

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary for space to become available.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final int c;
    final Node<E> node = new Node<E>(e);
    
    //入队锁,如果收到中断信号,则抛出异常
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
         //如果队列满了,则通知线程进入await状态。
        while (count.get() == capacity) {
            notFull.await();
        }
        //将node加入队列
        enqueue(node);
        //队列元素数加一
        c = count.getAndIncrement();
        //如果队列没满,则唤醒await的线程进行入队操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    //如果是第一次添加元素,则通知等待的读线程可以开始读数据了
    if (c == 0)
        signalNotEmpty();
}

offer

/**
 * Inserts the specified element at the tail of this queue if it is
 * possible to do so immediately without exceeding the queue's capacity,
 * returning {@code true} upon success and {@code false} if this queue
 * is full.
 * When using a capacity-restricted queue, this method is generally
 * preferable to method {@link BlockingQueue#add add}, which can fail to
 * insert an element only by throwing an exception.
 *
 * @throws NullPointerException if the specified element is null
 */
 //将元素加入队列,如果队列满,则直接返回false
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //如果队列满了,则直接返回false。
    if (count.get() == capacity)
        return false;
    final int c;
    final Node<E> node = new Node<E>(e);
    //加入队锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //再次判断, 队列是否满了,避免在第一次判断后和加锁前,队列被加满
        if (count.get() == capacity)
            return false;
        //将node添加到队列中
        enqueue(node);
        c = count.getAndIncrement();
        //如果队列没满,则唤醒await的线程进行入队操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    //如果是第一次添加元素,则通知等待的读线程可以开始读数据了
    if (c == 0)
        signalNotEmpty();
    return true;
}

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary up to the specified wait time for space to become available.
 *
 * @return {@code true} if successful, or {@code false} if
 *         the specified waiting time elapses before space is available
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
 //将元素加入队列,可以设置等待超时时间,如果队列满,则等待timeout毫秒,超时返回false
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    final int c;
    //加锁
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //如果队列满了,则等待timeout毫秒,超时则返回false
        while (count.get() == capacity) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        //入队
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        //如果队列没满,则唤醒await的线程进行入队操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    //如果是第一次添加元素,则通知等待的读线程可以开始读数据了
    if (c == 0)
        signalNotEmpty();
    return true;
}

take

从队列中取出元素,如果队列为空,则一直等待,直到线程被中断或被唤醒

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    //加出队锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //如果队列为空,则通知线程进入await状态。
        while (count.get() == 0) {
            notEmpty.await();
        }
        //从队列头部取出元素
        x = dequeue();
        //count减一
        c = count.getAndDecrement();
        //如果队列不为空,则唤醒出队等待线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        //释放锁
        takeLock.unlock();
    }
    //如果从满的队列中出列,则唤醒入队线程,队列已经不满了,可以添加元素了
    if (c == capacity)
        signalNotFull();
    return x;
}

poll

public E poll() {
    final AtomicInteger count = this.count;
    //如果队列为空,直接返回null
    if (count.get() == 0)
        return null;
    final E x;
    final int c;
    //加出队锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //如果队列为空,直接返回null
        if (count.get() == 0)
            return null;
        //出队,移除第一个数据节点
        x = dequeue();
        c = count.getAndDecrement();
        //如果队列不为空,则唤醒出队等待线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //如果从满的队列中出列,则唤醒入队线程,队列已经不满了,可以添加元素了
    if (c == capacity)
        signalNotFull();
    return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        final E x;
        final int c;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //如果队列为空,则等待timeout时间, 超时返回null
            while (count.get() == 0) {
                if (nanos <= 0L)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            //出队列
            x = dequeue();
            c = count.getAndDecrement();
            //如果队列不为空,则唤醒出队等待线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            //释放锁
            takeLock.unlock();
        }
        //如果从满的队列中出列,则唤醒入队线程,队列已经不满了,可以添加元素了
        if (c == capacity)
            signalNotFull();
        return x;
    }

peek

public E peek() {
    final AtomicInteger count = this.count;
    //如果队列为空,返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //队列不为空返回第一个数据节点的元素,不移除节点,为空则返回null
        return (count.get() > 0) ? head.next.item : null;
    } finally {
        takeLock.unlock();
    }
}

remove

/**
 * Removes a single instance of the specified element from this queue,
 * if it is present.  More formally, removes an element {@code e} such
 * that {@code o.equals(e)}, if this queue contains one or more such
 * elements.
 * Returns {@code true} if this queue contained the specified element
 * (or equivalently, if this queue changed as a result of the call).
 *
 * @param o element to be removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
 */
public boolean remove(Object o) {
    //如果移除的元素为null,在返回false
    if (o == null) return false;
    //加入队和出队锁
    fullyLock();
    try {
        //遍历队列,存在元素o则移除,返回true,否则返回false
        for (Node<E> pred = head, p = pred.next;
             p != null;
             pred = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, pred);
                return true;
            }
        }
        return false;
    } finally {
        //释放入队锁和出队锁
        fullyUnlock();
    }
}

add、element

都在AbstractQueue中实现,上文【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue中已分析,不再赘述。

总结

通过源码分析,我们了解到了入队和出队的机制。初始化时,需要设置队列大小, 在队列满时,入队操作会等待,队列为空时,出队操作会等待。即LinkedBlockingQueue是一个有界的阻塞队列。
和ConcurrentLinkedQueue对比,LinkedBlockingQueue采用锁分离,比较适合生产和消费频率差不多的场景,并且锁同步更适合单消费者的任务队列,而ConcurrentLinkedQueue使用CAS,并发性能较高更适合消费者多的消息队列。
在常用线程池中,Executors.newFixedThreadPool也是采用的LinkedBlockingQueue作为workQueue,在线程数超过corePoolSize后,会将任务加入到workQueue中等待处理。关于线程池的使用,后面会详细展开。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
5天前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第12天】 在现代软件开发中,多线程编程是提升应用程序性能和响应能力的关键手段之一。特别是在Java语言中,由于其内置的跨平台线程支持,开发者可以轻松地创建和管理线程。然而,随之而来的并发问题也不容小觑。本文将探讨Java并发编程的核心概念,包括线程安全策略、锁机制以及性能优化技巧。通过实例分析与性能比较,我们旨在为读者提供一套既确保线程安全又兼顾性能的编程指导。
|
5天前
|
数据采集 安全 Java
Java并发编程学习12-任务取消(上)
【5月更文挑战第6天】本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容
30 4
Java并发编程学习12-任务取消(上)
|
1天前
|
Java
Java一分钟之-并发编程:线程间通信(Phaser, CyclicBarrier, Semaphore)
【5月更文挑战第19天】Java并发编程中,Phaser、CyclicBarrier和Semaphore是三种强大的同步工具。Phaser用于阶段性任务协调,支持动态注册;CyclicBarrier允许线程同步执行,适合循环任务;Semaphore控制资源访问线程数,常用于限流和资源池管理。了解其使用场景、常见问题及避免策略,结合代码示例,能有效提升并发程序效率。注意异常处理和资源管理,以防止并发问题。
21 2
|
1天前
|
安全 Java 容器
Java一分钟之-并发编程:线程安全的集合类
【5月更文挑战第19天】Java提供线程安全集合类以解决并发环境中的数据一致性问题。例如,Vector是线程安全但效率低;可以使用Collections.synchronizedXxx将ArrayList或HashMap同步;ConcurrentHashMap是高效线程安全的映射;CopyOnWriteArrayList和CopyOnWriteArraySet适合读多写少场景;LinkedBlockingQueue是生产者-消费者模型中的线程安全队列。注意,过度同步可能影响性能,应尽量减少共享状态并利用并发工具类。
15 2
|
2天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
12 5
|
2天前
|
安全 Java 容器
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第18天】随着多核处理器的普及,并发编程变得越来越重要。Java提供了丰富的并发编程工具,如synchronized关键字、显式锁Lock、原子类、并发容器等。本文将深入探讨Java并发编程的核心概念,包括线程安全、死锁、资源竞争等,并分享一些性能优化的技巧。
|
2天前
|
安全 Java
Java一分钟之-并发编程:原子类(AtomicInteger, AtomicReference)
【5月更文挑战第18天】Java并发编程中的原子类如`AtomicInteger`和`AtomicReference`提供无锁原子操作,适用于高性能并发场景。`AtomicInteger`支持原子整数操作,而`AtomicReference`允许原子更新对象引用。常见问题包括误解原子性、过度依赖原子类以及忽略对象内部状态的并发控制。要避免这些问题,需明确原子操作边界,合理选择同步策略,并精确控制原子更新。示例代码展示了如何使用这两个类。正确理解和使用原子类是构建高效并发程序的关键。
12 1
|
2天前
|
安全 Java 容器
Java一分钟之-并发编程:并发容器(ConcurrentHashMap, CopyOnWriteArrayList)
【5月更文挑战第18天】本文探讨了Java并发编程中的`ConcurrentHashMap`和`CopyOnWriteArrayList`,两者为多线程数据共享提供高效、线程安全的解决方案。`ConcurrentHashMap`采用分段锁策略,而`CopyOnWriteArrayList`适合读多写少的场景。注意,`ConcurrentHashMap`的`forEach`需避免手动同步,且并发修改时可能导致`ConcurrentModificationException`。`CopyOnWriteArrayList`在写操作时会复制数组。理解和正确使用这些特性是优化并发性能的关键。
9 1
|
2天前
|
Java 编译器
Java并发编程中的锁优化策略
【5月更文挑战第18天】在Java并发编程中,锁是一种常用的同步机制,用于保护共享资源的访问。然而,不当的锁使用可能导致性能问题和死锁风险。本文将探讨Java中锁的优化策略,包括锁粗化、锁消除、锁分离和读写锁等技术,以提高并发程序的性能和可靠性。
|
3天前
|
Java 编译器
Java 并发编程中的锁优化策略
【5月更文挑战第17天】在 Java 并发编程中,锁是一种常见的同步机制,用于保护共享资源的访问。然而,不当使用锁可能导致性能问题和死锁风险。本文将探讨 Java 中的锁优化策略,包括锁粗化、锁消除、锁降级以及读写锁等技术,以提高并发程序的性能和可靠性。