Java并发编程之LinkedBlockingQueue

简介: Java并发编程之LinkedBlockingQueue

LinkedBlockingQueue:



主要成员变量:


public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
     /**
     * 链表节点类
     */
    static class Node<E> {
        E item;
        //节点的后继
        Node<E> next;
        Node(E x) { item = x; }
    }
    /** 阻塞队列的最大容量, 如果未设置的话,该值为:Integer.MAX_VALUE */
    private final int capacity;
    /** 队列中当前的元素个数 */
    private final AtomicInteger count = new AtomicInteger();
    /**
     * 链表的头结点.
     * 头结点不存放具体元素:head.item == null
     */
    transient Node<E> head;
    /**
     * 链表的尾节点.
     * 尾节点的后继为null: last.next == null
     */
    private transient Node<E> last;
    /** 出队锁,take, poll操作时需要加该锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 出队条件 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 入队锁,put, offer操作时需要加该锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 入队条件 */
    private final Condition notFull = putLock.newCondition();
 }


LinkedBlockingQueue内部的具体实现是一个自定义链表,维护了链表的头结点和尾节点,从头结点出队,从尾节点入队,从而实现了队列的功能。


2.阻塞有两重含义:


(1)当队列满了的时候阻塞入队,也就是说如果有线程往队列里面添加元素,如果此时队列已满,那么该线程将被阻塞直到其他线程消费队列里的元素为止。


(2)当队列为空的时候阻塞出队,也就是说如果有线程从队列里面获取元素,如果此时队列为空,那么该线程将被阻塞直到其他线程往队列里面写入元素为止。


3.入队操作:


/**
     * 向队列尾部插入一个元素,如果没有空间则阻塞直到空间可用
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
public void put(E e) throws InterruptedException {
    //不能写入空元素
    if (e == null) throw new NullPointerException();
    // 标记写入时队列的元素个数
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //先要获取入队锁
    putLock.lockInterruptibly();
    try {
        /*
             * 循环检测队列是否已满,如果已满则阻塞
             * 考虑这种场景:A入队线程在等待入队条件,B线程消费了1个元素
             * 此时刚好C线程获取入队锁,进行入队操作,那么此时A线程被唤醒后
             * 必须要再次判断队列是否已满,否则会出现队列元素比容量大的问题
             * 所以此处要循环判断
             */
        while (count.get() == capacity) {
            notFull.await();
        }
        //入队操作,在锁内部是安全的,修改链表指针位置即可
        enqueue(node);
        //获取入队时的元素个数,并将元素个数加1
        c = count.getAndIncrement();
        //如果入队后,队列仍未满,则唤醒其他等待写入的线程
        //考虑这种场景:A,B两个入队线程都在等待入队条件,C,D两个线程同时消费了2个元素
        //此时A,B都可以入队,但是入队操作只会唤醒一次(c==capacity),假如唤醒了A,
        //那么,A入队后要判断是否还有空间,继续唤醒B
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //如果写入时,队列里没有元素,则唤醒出队线程;因为此时可能有出队线程在等待
    if (c == 0)
        signalNotEmpty();
}
/**
     * 向队列尾部插入一个元素,若空间已满则最多等待指定 时间.
     *
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //与put一样,都是先获取锁
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            //等待超时,返回
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}
/**
     * 向队列尾部插入元素,如果队列已满直接返回fasle;否则,尝试写入,写入成功返回true,失败返回false
     * @throws NullPointerException if the specified element is null
     */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //队列已满,直接返回
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //先获取写入锁
    putLock.lock();
    try {
        //判断队列是否已满
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    //判断释放写入,如果写入成功c != -1
    return c >= 0;
}


4.出队操作:


//从队列头部出队,如果队列为空则阻塞
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lockInterruptibly();
    try {
        //循环判断队列是否为空,若为空,则阻塞
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出队,在锁内部,修改头节点指针即可
        x = dequeue();
        //获取出队前队列元素个数,并将元素个数减一
        c = count.getAndDecrement();
        //如果出队后还有元素,则唤醒其他出队线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //如果出队前空间已满,则说明此时出队后,还有1个位置,此时唤醒入队线程
    if (c == capacity)
        signalNotFull();
    return x;
}
//从队列头部出队,如果队列为空则阻塞指定的时间,获取成功返回元素,否则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lockInterruptibly();
    try {
        //此处的循环判断原因,同put内部的循环判断
        while (count.get() == 0) {
            //获取超时返回空
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //同上
    if (c == capacity)
        signalNotFull();
    return x;
}
//出队,如果队列为空则返回null
public E poll() {
    final AtomicInteger count = this.count;
    //队列为空,立即返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lock();
    try {
        //队列不为空,则获取
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
//获取队头元素,此操作不出队,仅仅是读取
public E peek() {
    //队列为空,返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁,但是不会出队,仅仅读取
    takeLock.lock();
    try {
        //读取队头元素
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}


5.可以看到JDK内部是通过一个单向链表的数据结构,配合入队的ReentrantLock以及出队的ReentrantLock最终实现了阻塞队列的语义。

目录
相关文章
|
4月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
232 6
|
4月前
|
IDE Java 编译器
java编程最基础学习
Java入门需掌握:环境搭建、基础语法、面向对象、数组集合与异常处理。通过实践编写简单程序,逐步深入学习,打牢编程基础。
294 1
|
5月前
|
SQL Java 数据库
2025 年 Java 从零基础小白到编程高手的详细学习路线攻略
2025年Java学习路线涵盖基础语法、面向对象、数据库、JavaWeb、Spring全家桶、分布式、云原生与高并发技术,结合实战项目与源码分析,助力零基础学员系统掌握Java开发技能,从入门到精通,全面提升竞争力,顺利进阶编程高手。
1050 0
|
4月前
|
安全 前端开发 Java
从反射到方法句柄:深入探索Java动态编程的终极解决方案
从反射到方法句柄,Java 动态编程不断演进。方法句柄以强类型、低开销、易优化的特性,解决反射性能差、类型弱、安全性低等问题,结合 `invokedynamic` 成为支撑 Lambda 与动态语言的终极方案。
219 0
|
6月前
|
安全 Java 数据库连接
2025 年最新 Java 学习路线图含实操指南助你高效入门 Java 编程掌握核心技能
2025年最新Java学习路线图,涵盖基础环境搭建、核心特性(如密封类、虚拟线程)、模块化开发、响应式编程、主流框架(Spring Boot 3、Spring Security 6)、数据库操作(JPA + Hibernate 6)及微服务实战,助你掌握企业级开发技能。
893 3
|
5月前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
516 100
|
5月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
429 16
|
5月前
|
NoSQL Java 关系型数据库
超全 Java 学习路线,帮你系统掌握编程的超详细 Java 学习路线
本文为超全Java学习路线,涵盖基础语法、面向对象编程、数据结构与算法、多线程、JVM原理、主流框架(如Spring Boot)、数据库(MySQL、Redis)及项目实战等内容,助力从零基础到企业级开发高手的进阶之路。
438 1
|
6月前
|
安全 算法 Java
Java泛型编程:类型安全与擦除机制
Java泛型详解:从基础语法到类型擦除机制,深入解析通配符与PECS原则,探讨运行时类型获取技巧及最佳实践,助你掌握泛型精髓,写出更安全、灵活的代码。
|
6月前
|
安全 Java Shell
Java模块化编程(JPMS)简介与实践
本文全面解析Java 9模块化系统(JPMS),帮助开发者解决JAR地狱、类路径冲突等常见问题,提升代码的封装性、性能与可维护性。内容涵盖模块化核心概念、module-info语法、模块声明、实战迁移、多模块项目构建、高级特性及最佳实践,同时提供常见问题和面试高频题解析,助你掌握Java模块化编程精髓,打造更健壮的应用。