Java Review - 并发编程_ArrayBlockingQueue原理&源码剖析

简介: Java Review - 并发编程_ArrayBlockingQueue原理&源码剖析

195d03d17afc4a928bc581f313b01dfe.png

概述


Java Review - 并发编程_LinkedBlockingQueue原理&源码剖析

介绍了使用有界链表方式实现的阻塞队列LinkedBlockingQueue,这里我们继续来研究使用有界数组方式实现的阻塞队列ArrayBlockingQueue的原理。


类图结构


23afa6ead3d145169378b992ed525e7e.png


由该图可以看出,ArrayBlockingQueue


内部有一个数组items,用来存放队列元素

putindex变量表示入队元素下标

takeIndex是出队下标

count统计队列元素个数


从定义可知,这些变量并没有使用volatile修饰,这是因为访问这些变量都是在锁块内,而加锁已经保证了锁块内变量的内存可见性了。



b800cc57c4884d7fa64f70ca9f41f7f9.png


另外有个独占锁lock用来保证出、入队操作的原子性,这保证了同时只有一个线程可以进行入队、出队操作。

f1d4a9e3a7ef4b3a99a39692908ccf2f.png


另外,notEmpty、notFull条件变量用来进行出、入队的同步。

92067992143642789ba30fe3eff72929.png

构造函数


ArrayBlockingQueue是有界队列,所以构造函数必须传入队列大小参数。

构造函数的代码如下。


2d19a5357a70489cb1e00e1d11d4fdcd.png


    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

由以上代码可知,在默认情况下使用ReentrantLock提供的非公平独占锁进行出、入队操作的同步。


主要方法源码解析


研究过LinkedBlockingQueue的实现后再看ArrayBlockingQueue的实现会感觉后者简单了很多


offer操作


向队列尾部插入一个元素,如果队列有空闲空间则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false。


如果e元素为null则抛出NullPointerException异常。

另外,该方法是不阻塞的。

  public boolean offer(E e) {
      // 1 
        checkNotNull(e);
        // 2 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          // 3 
            if (count == items.length)
                return false;
            else {
              // 4 
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

代码(1) 如果e元素为null则抛出NullPointerException异常

代码(2)获取独占锁,当前线程获取该锁后,其他入队和出队操作的线程都会被阻塞挂起而后被放入lock锁的AQS阻塞队列。

代码(3)判断如果队列满则直接返回false,否则调用enqueue方法后返回true,enqueue的代码如下

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        // 6 元素入队
        final Object[] items = this.items;
        items[putIndex] = x;
        // 7 计算下一个元素应该存放的下标位置
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 8 
        notEmpty.signal();
    }


如上代码首先把当前元素放入items数组,然后计算下一个元素应该存放的下标位置,并递增元素个数计数器,最后激活notEmpty的条件队列中因为调用take操作而被阻塞的一个线程。


这里由于在操作共享变量count前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是从CPU缓存或者寄存器获取。


代码(5)释放锁,然后会把修改的共享变量值(比如count的值)刷新回主内存中,这样其他线程通过加锁再次读取这些共享变量时,就可以看到最新的值。


put操作


向队列尾部插入一个元素,如果队列有空闲则插入后直接返回true,如果队列已满则阻塞当前线程直到队列有空闲并插入成功后返回true,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回

另外,如果e元素为null则抛出NullPointerException异常。

public void put(E e) throws InterruptedException {
    // 1 
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 2 获取锁  可被中断
        lock.lockInterruptibly();
        try {
          // 3 如果队列满,这把当前下层放入notFull管理的条件队列
            while (count == items.length)
                notFull.await();
            // 4 插入队列    
            enqueue(e);
        } finally {
          // 5 
            lock.unlock();
        }
    }



在代码(2)中,在获取锁的过程中当前线程被其他线程中断了,则当前线程会抛出InterruptedException异常而退出。


代码(3)判断如果当前队列已满,则把当前线程阻塞挂起后放入notFull的条件队列,注意这里也是使用了while循环而不是if语句。\


代码(4)判断如果队列不满则插入当前元素,此处不再赘述。


poll操作


从队列头部获取并移除一个元素,如果队列为空则返回null,该方法是不阻塞的

  public E poll() {
      // 1 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          // 2 
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }


  • 代码(1)获取独占锁。
  • 代码(2)判断如果队列为空则返回null,否则调用dequeue()方法
   private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
         //  4 获取元素
        E x = (E) items[takeIndex];
        // 5 数组中的值为null 
        items[takeIndex] = null;
        // 6 对头指针计算,队列元素个数减一
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 7 发送信号激活notFull条件队列中的一个线程
        notFull.signal();
        return x;
    }


由以上代码可知,首先获取当前队头元素并将其保存到局部变量,然后重置队头元素为null,并重新设置队头下标,递减元素计数器,最后发送信号激活notFull的条件队列里面一个因为调用put方法而被阻塞的线程


take操作


获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。

 public E take() throws InterruptedException {
    // 1 
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          // 2  队列为空则等待,直到队列中有数据 
            while (count == 0)
                notEmpty.await();
            // 3 获取头部元素
            return dequeue();
        } finally {
          // 4  
            lock.unlock();
        }


take操作的代码也比较简单,与poll相比只是代码(2)不同。


在这里,如果队列为空则把当前线程挂起后放入notEmpty的条件队列,等其他线程调用notEmpty.signal()方法后再返回。


需要注意的是,这里也是使用while循环进行检测并等待而不是使用if语句。


peek操作

获取队列头部元素但是不从队列里面移除它,如果队列为空则返回null,该方法是不阻塞的

    public E peek() {
      //  1 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          // 2 
            return itemAt(takeIndex); // null when queue is empty
        } finally {
          // 3 
            lock.unlock();
        }
    }
  final E itemAt(int i) {
        return (E) items[i];
    }

peek的实现更简单,首先获取独占锁,然后从数组items中获取当前队头下标的值并返回,在返回前释放获取的锁。


size

计算当前队列元素个数。

    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }


size操作比较简单,获取锁后直接返回count,并在返回前释放锁。


也许你会问,这里又没有修改count的值,只是简单地获取,为何要加锁呢?


其实如果count被声明为volatile的这里就不需要加锁了,因为volatile类型的变量保证了内存的可见性,而ArrayBlockingQueue中的count并没有被声明为volatile的,这是因为count操作都是在获取锁后进行的,


而获取锁的语义之一是,获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性。


小结


589ec96d6a80407f848d9ab0628445a4.png

ArrayBlockingQueue通过使用全局独占锁实现了同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似于在方法上添加synchronized的意思。


其中offer和poll操作通过简单的加锁进行入队、出队操作,


而put、take操作则使用条件变量实现了,如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。


另外,相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。

相关文章
|
11天前
|
存储 缓存 人工智能
【原理】【Java并发】【synchronized】适合中学者体质的synchronized原理
本文深入解析了Java中`synchronized`关键字的底层原理,从代码块与方法修饰的区别到锁升级机制,内容详尽。通过`monitorenter`和`monitorexit`指令,阐述了`synchronized`实现原子性、有序性和可见性的原理。同时,详细分析了锁升级流程:无锁 → 偏向锁 → 轻量级锁 → 重量级锁,结合对象头`MarkWord`的变化,揭示JVM优化锁性能的策略。此外,还探讨了Monitor的内部结构及线程竞争锁的过程,并介绍了锁消除与锁粗化等优化手段。最后,结合实际案例,帮助读者全面理解`synchronized`在并发编程中的作用与细节。
37 8
【原理】【Java并发】【synchronized】适合中学者体质的synchronized原理
|
19天前
|
存储 缓存 安全
【原理】【Java并发】【volatile】适合初学者体质的volatile原理
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是写出高端的CRUD应用。2025年,我正在沉淀自己,博客更新速度也在加快。在这里,我会分享关于Java并发编程的深入理解,尤其是volatile关键字的底层原理。 本文将带你深入了解Java内存模型(JMM),解释volatile如何通过内存屏障和缓存一致性协议确保可见性和有序性,同时探讨其局限性及优化方案。欢迎订阅专栏《在2B工作中寻求并发是否搞错了什么》,一起探索并发编程的奥秘! 关注我,点赞、收藏、评论,跟上更新节奏,让我们共同进步!
89 8
【原理】【Java并发】【volatile】适合初学者体质的volatile原理
|
12天前
|
消息中间件 Java 应用服务中间件
JVM实战—1.Java代码的运行原理
本文介绍了Java代码的运行机制、JVM类加载机制、JVM内存区域及其作用、垃圾回收机制,并汇总了一些常见问题。
JVM实战—1.Java代码的运行原理
|
1月前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
1月前
|
存储 算法 Java
【JAVA】生成accessToken原理
在Java中,生成accessToken用于身份验证和授权,确保合法用户访问受保护资源。流程包括:1. 身份验证(如用户名密码、OAuth 2.0);2. 生成唯一且安全的令牌;3. 设置令牌有效期并存储;4. 客户端传递令牌,服务器验证其有效性。常见场景为OAuth 2.0协议,涉及客户端注册、用户授权、获取授权码和换取accessToken。示例代码展示了使用Apache HttpClient库模拟OAuth 2.0获取accessToken的过程。
|
27天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
158 60
【Java并发】【线程池】带你从0-1入门线程池
|
16天前
|
存储 网络协议 安全
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
66 23
|
23天前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
94 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
1月前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
136 14
|
1月前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
61 13