Java并发编程之LinkedBlockingQueue

简介: Java并发编程之LinkedBlockingQueue

LinkedBlockingQueue:



主要成员变量:


public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
     /**
     * 链表节点类
     */
    static class Node<E> {
        E item;
        //节点的后继
        Node<E> next;
        Node(E x) { item = x; }
    }
    /** 阻塞队列的最大容量, 如果未设置的话,该值为:Integer.MAX_VALUE */
    private final int capacity;
    /** 队列中当前的元素个数 */
    private final AtomicInteger count = new AtomicInteger();
    /**
     * 链表的头结点.
     * 头结点不存放具体元素:head.item == null
     */
    transient Node<E> head;
    /**
     * 链表的尾节点.
     * 尾节点的后继为null: last.next == null
     */
    private transient Node<E> last;
    /** 出队锁,take, poll操作时需要加该锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 出队条件 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 入队锁,put, offer操作时需要加该锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 入队条件 */
    private final Condition notFull = putLock.newCondition();
 }


LinkedBlockingQueue内部的具体实现是一个自定义链表,维护了链表的头结点和尾节点,从头结点出队,从尾节点入队,从而实现了队列的功能。


2.阻塞有两重含义:


(1)当队列满了的时候阻塞入队,也就是说如果有线程往队列里面添加元素,如果此时队列已满,那么该线程将被阻塞直到其他线程消费队列里的元素为止。


(2)当队列为空的时候阻塞出队,也就是说如果有线程从队列里面获取元素,如果此时队列为空,那么该线程将被阻塞直到其他线程往队列里面写入元素为止。


3.入队操作:


/**
     * 向队列尾部插入一个元素,如果没有空间则阻塞直到空间可用
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
public void put(E e) throws InterruptedException {
    //不能写入空元素
    if (e == null) throw new NullPointerException();
    // 标记写入时队列的元素个数
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //先要获取入队锁
    putLock.lockInterruptibly();
    try {
        /*
             * 循环检测队列是否已满,如果已满则阻塞
             * 考虑这种场景:A入队线程在等待入队条件,B线程消费了1个元素
             * 此时刚好C线程获取入队锁,进行入队操作,那么此时A线程被唤醒后
             * 必须要再次判断队列是否已满,否则会出现队列元素比容量大的问题
             * 所以此处要循环判断
             */
        while (count.get() == capacity) {
            notFull.await();
        }
        //入队操作,在锁内部是安全的,修改链表指针位置即可
        enqueue(node);
        //获取入队时的元素个数,并将元素个数加1
        c = count.getAndIncrement();
        //如果入队后,队列仍未满,则唤醒其他等待写入的线程
        //考虑这种场景:A,B两个入队线程都在等待入队条件,C,D两个线程同时消费了2个元素
        //此时A,B都可以入队,但是入队操作只会唤醒一次(c==capacity),假如唤醒了A,
        //那么,A入队后要判断是否还有空间,继续唤醒B
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //如果写入时,队列里没有元素,则唤醒出队线程;因为此时可能有出队线程在等待
    if (c == 0)
        signalNotEmpty();
}
/**
     * 向队列尾部插入一个元素,若空间已满则最多等待指定 时间.
     *
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //与put一样,都是先获取锁
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            //等待超时,返回
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}
/**
     * 向队列尾部插入元素,如果队列已满直接返回fasle;否则,尝试写入,写入成功返回true,失败返回false
     * @throws NullPointerException if the specified element is null
     */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //队列已满,直接返回
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //先获取写入锁
    putLock.lock();
    try {
        //判断队列是否已满
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    //判断释放写入,如果写入成功c != -1
    return c >= 0;
}


4.出队操作:


//从队列头部出队,如果队列为空则阻塞
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lockInterruptibly();
    try {
        //循环判断队列是否为空,若为空,则阻塞
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出队,在锁内部,修改头节点指针即可
        x = dequeue();
        //获取出队前队列元素个数,并将元素个数减一
        c = count.getAndDecrement();
        //如果出队后还有元素,则唤醒其他出队线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //如果出队前空间已满,则说明此时出队后,还有1个位置,此时唤醒入队线程
    if (c == capacity)
        signalNotFull();
    return x;
}
//从队列头部出队,如果队列为空则阻塞指定的时间,获取成功返回元素,否则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lockInterruptibly();
    try {
        //此处的循环判断原因,同put内部的循环判断
        while (count.get() == 0) {
            //获取超时返回空
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //同上
    if (c == capacity)
        signalNotFull();
    return x;
}
//出队,如果队列为空则返回null
public E poll() {
    final AtomicInteger count = this.count;
    //队列为空,立即返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lock();
    try {
        //队列不为空,则获取
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
//获取队头元素,此操作不出队,仅仅是读取
public E peek() {
    //队列为空,返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁,但是不会出队,仅仅读取
    takeLock.lock();
    try {
        //读取队头元素
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}


5.可以看到JDK内部是通过一个单向链表的数据结构,配合入队的ReentrantLock以及出队的ReentrantLock最终实现了阻塞队列的语义。

目录
相关文章
|
8天前
|
算法 Java
【编程基础知识】Java打印九九乘法表
本文介绍了在Java中实现九九乘法表的三种方法:嵌套循环、数组和流控制。通过代码示例、流程图和表格对比,帮助读者深入理解每种方法的优缺点,提升编程技能。
30 2
|
8天前
|
存储 Java
【编程基础知识】 分析学生成绩:用Java二维数组存储与输出
本文介绍如何使用Java二维数组存储和处理多个学生的各科成绩,包括成绩的输入、存储及格式化输出,适合初学者实践Java基础知识。
37 1
|
4天前
|
安全 Java UED
Java中的多线程编程:从基础到实践
本文深入探讨了Java中的多线程编程,包括线程的创建、生命周期管理以及同步机制。通过实例展示了如何使用Thread类和Runnable接口来创建线程,讨论了线程安全问题及解决策略,如使用synchronized关键字和ReentrantLock类。文章还涵盖了线程间通信的方式,包括wait()、notify()和notifyAll()方法,以及如何避免死锁。此外,还介绍了高级并发工具如CountDownLatch和CyclicBarrier的使用方法。通过综合运用这些技术,可以有效提高多线程程序的性能和可靠性。
|
4天前
|
缓存 Java UED
Java中的多线程编程:从基础到实践
【10月更文挑战第13天】 Java作为一门跨平台的编程语言,其强大的多线程能力一直是其核心优势之一。本文将从最基础的概念讲起,逐步深入探讨Java多线程的实现方式及其应用场景,通过实例讲解帮助读者更好地理解和应用这一技术。
19 3
|
4天前
|
Java 开发者
在Java编程中,正确的命名规范不仅能提升代码的可读性和可维护性,还能有效避免命名冲突。
【10月更文挑战第13天】在Java编程中,正确的命名规范不仅能提升代码的可读性和可维护性,还能有效避免命名冲突。本文将带你深入了解Java命名规则,包括标识符的基本规则、变量和方法的命名方式、常量的命名习惯以及如何避免关键字冲突,通过实例解析,助你写出更规范、优雅的代码。
25 3
|
4天前
|
Java 程序员
在Java编程中,关键字不仅是简单的词汇,更是赋予代码强大功能的“魔法咒语”。
【10月更文挑战第13天】在Java编程中,关键字不仅是简单的词汇,更是赋予代码强大功能的“魔法咒语”。本文介绍了Java关键字的基本概念及其重要性,并通过定义类和对象、控制流程、访问修饰符等示例,展示了关键字的实际应用。掌握这些关键字,是成为优秀Java程序员的基础。
11 3
|
4天前
|
Java 程序员 编译器
在Java编程中,保留字(如class、int、for等)是具有特定语法意义的预定义词汇,被语言本身占用,不能用作变量名、方法名或类名。
在Java编程中,保留字(如class、int、for等)是具有特定语法意义的预定义词汇,被语言本身占用,不能用作变量名、方法名或类名。本文通过示例详细解析了保留字的定义、作用及与自定义标识符的区别,帮助开发者避免因误用保留字而导致的编译错误,确保代码的正确性和可读性。
15 3
|
4天前
|
算法 Java
在Java编程中,关键字和保留字是基础且重要的组成部分,正确理解和使用它们
【10月更文挑战第13天】在Java编程中,关键字和保留字是基础且重要的组成部分。正确理解和使用它们,如class、int、for、while等,不仅能够避免语法错误,还能提升代码的可读性和执行效率。本指南将通过解答常见问题,帮助你掌握Java关键字的正确使用方法,以及如何避免误用保留字,使你的代码更加高效流畅。
19 3
|
3天前
|
存储 安全 Java
了解final关键字在Java并发编程领域的作用吗?
在Java并发编程中,`final`关键字不仅用于修饰变量、方法和类,还在多线程环境中确保对象状态的可见性和不变性。本文深入探讨了`final`关键字的作用,特别是其在final域重排序规则中的应用,以及如何防止对象的“部分创建”问题,确保线程安全。通过具体示例,文章详细解析了final域的写入和读取操作的重排序规则,以及这些规则在不同处理器上的实现差异。
了解final关键字在Java并发编程领域的作用吗?
|
6天前
|
设计模式 SQL 安全
【编程进阶知识】Java单例模式深度解析:饿汉式与懒汉式实现技巧
本文深入解析了Java单例模式中的饿汉式和懒汉式实现方法,包括它们的特点、实现代码和适用场景。通过静态常量、枚举类、静态代码块等方式实现饿汉式,通过非线程安全、同步方法、同步代码块、双重检查锁定和静态内部类等方式实现懒汉式。文章还对比了各种实现方式的优缺点,帮助读者在实际项目中做出更好的设计决策。
22 0