Java并发编程之LinkedBlockingQueue

简介: Java并发编程之LinkedBlockingQueue

LinkedBlockingQueue:



主要成员变量:


public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
     /**
     * 链表节点类
     */
    static class Node<E> {
        E item;
        //节点的后继
        Node<E> next;
        Node(E x) { item = x; }
    }
    /** 阻塞队列的最大容量, 如果未设置的话,该值为:Integer.MAX_VALUE */
    private final int capacity;
    /** 队列中当前的元素个数 */
    private final AtomicInteger count = new AtomicInteger();
    /**
     * 链表的头结点.
     * 头结点不存放具体元素:head.item == null
     */
    transient Node<E> head;
    /**
     * 链表的尾节点.
     * 尾节点的后继为null: last.next == null
     */
    private transient Node<E> last;
    /** 出队锁,take, poll操作时需要加该锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 出队条件 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 入队锁,put, offer操作时需要加该锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 入队条件 */
    private final Condition notFull = putLock.newCondition();
 }


LinkedBlockingQueue内部的具体实现是一个自定义链表,维护了链表的头结点和尾节点,从头结点出队,从尾节点入队,从而实现了队列的功能。


2.阻塞有两重含义:


(1)当队列满了的时候阻塞入队,也就是说如果有线程往队列里面添加元素,如果此时队列已满,那么该线程将被阻塞直到其他线程消费队列里的元素为止。


(2)当队列为空的时候阻塞出队,也就是说如果有线程从队列里面获取元素,如果此时队列为空,那么该线程将被阻塞直到其他线程往队列里面写入元素为止。


3.入队操作:


/**
     * 向队列尾部插入一个元素,如果没有空间则阻塞直到空间可用
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
public void put(E e) throws InterruptedException {
    //不能写入空元素
    if (e == null) throw new NullPointerException();
    // 标记写入时队列的元素个数
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //先要获取入队锁
    putLock.lockInterruptibly();
    try {
        /*
             * 循环检测队列是否已满,如果已满则阻塞
             * 考虑这种场景:A入队线程在等待入队条件,B线程消费了1个元素
             * 此时刚好C线程获取入队锁,进行入队操作,那么此时A线程被唤醒后
             * 必须要再次判断队列是否已满,否则会出现队列元素比容量大的问题
             * 所以此处要循环判断
             */
        while (count.get() == capacity) {
            notFull.await();
        }
        //入队操作,在锁内部是安全的,修改链表指针位置即可
        enqueue(node);
        //获取入队时的元素个数,并将元素个数加1
        c = count.getAndIncrement();
        //如果入队后,队列仍未满,则唤醒其他等待写入的线程
        //考虑这种场景:A,B两个入队线程都在等待入队条件,C,D两个线程同时消费了2个元素
        //此时A,B都可以入队,但是入队操作只会唤醒一次(c==capacity),假如唤醒了A,
        //那么,A入队后要判断是否还有空间,继续唤醒B
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //如果写入时,队列里没有元素,则唤醒出队线程;因为此时可能有出队线程在等待
    if (c == 0)
        signalNotEmpty();
}
/**
     * 向队列尾部插入一个元素,若空间已满则最多等待指定 时间.
     *
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //与put一样,都是先获取锁
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            //等待超时,返回
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}
/**
     * 向队列尾部插入元素,如果队列已满直接返回fasle;否则,尝试写入,写入成功返回true,失败返回false
     * @throws NullPointerException if the specified element is null
     */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //队列已满,直接返回
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //先获取写入锁
    putLock.lock();
    try {
        //判断队列是否已满
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    //判断释放写入,如果写入成功c != -1
    return c >= 0;
}


4.出队操作:


//从队列头部出队,如果队列为空则阻塞
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lockInterruptibly();
    try {
        //循环判断队列是否为空,若为空,则阻塞
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出队,在锁内部,修改头节点指针即可
        x = dequeue();
        //获取出队前队列元素个数,并将元素个数减一
        c = count.getAndDecrement();
        //如果出队后还有元素,则唤醒其他出队线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //如果出队前空间已满,则说明此时出队后,还有1个位置,此时唤醒入队线程
    if (c == capacity)
        signalNotFull();
    return x;
}
//从队列头部出队,如果队列为空则阻塞指定的时间,获取成功返回元素,否则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lockInterruptibly();
    try {
        //此处的循环判断原因,同put内部的循环判断
        while (count.get() == 0) {
            //获取超时返回空
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //同上
    if (c == capacity)
        signalNotFull();
    return x;
}
//出队,如果队列为空则返回null
public E poll() {
    final AtomicInteger count = this.count;
    //队列为空,立即返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lock();
    try {
        //队列不为空,则获取
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
//获取队头元素,此操作不出队,仅仅是读取
public E peek() {
    //队列为空,返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁,但是不会出队,仅仅读取
    takeLock.lock();
    try {
        //读取队头元素
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}


5.可以看到JDK内部是通过一个单向链表的数据结构,配合入队的ReentrantLock以及出队的ReentrantLock最终实现了阻塞队列的语义。

目录
相关文章
|
7天前
|
Kubernetes 负载均衡 Java
k8s的出现解决了java并发编程胡问题了
Kubernetes通过提供自动化管理、资源管理、服务发现和负载均衡、持续交付等功能,有效地解决了Java并发编程中的许多复杂问题。它不仅简化了线程管理和资源共享,还提供了强大的负载均衡和故障恢复机制,确保应用程序在高并发环境下的高效运行和稳定性。通过合理配置和使用Kubernetes,开发者可以显著提高Java应用程序的性能和可靠性。
57 31
|
8天前
|
Java 编译器 开发者
注解的艺术:Java编程的高级定制
注解是Java编程中的高级特性,通过内置注解、自定义注解及注解处理器,可以实现代码的高度定制和扩展。通过理解和掌握注解的使用方法,开发者可以提高代码的可读性、可维护性和开发效率。在实际应用中,注解广泛用于框架开发、代码生成和配置管理等方面,展示了其强大的功能和灵活性。
60 25
|
10天前
|
Java 开发工具
课时6:Java编程起步
课时6:Java编程起步,主讲人李兴华。课程摘要:介绍Java编程的第一个程序“Hello World”,讲解如何使用记事本或EditPlus编写、保存和编译Java源代码(*.java文件),并解释类定义、主方法(public static void main)及屏幕打印(System.out.println)。强调类名与文件名一致的重要性,以及Java程序的编译和执行过程。通过实例演示,帮助初学者掌握Java编程的基本步骤和常见问题。
|
3月前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
3月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
287 2
|
3月前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
102 12
|
3月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
3月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
80 3
|
3月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
60 0
|
3月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!