【源码】【Java并发】【ArrayBlockingQueue】适合中学者体质的ArrayBlockingQueue

简介: 前言 通过之前的学习是不是学的不过瘾,没关系,马上和主播来挑战源码的阅读 【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门 还有一件事

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD

🔥 2025本人正在沉淀中... 博客更新速度++

👍 欢迎点赞、收藏、关注,跟上我的更新节奏

📚欢迎订阅专栏,专栏名《在2B工作中寻求并发是否搞错了什么》

前言

通过之前的学习是不是学的不过瘾,没关系,马上和主播来挑战源码的阅读

【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门

👆🤓还有一件事,因为ArrayBlokcingQueue中用到了ReentrantLock,下面的内容,主播默认你是懂ReentrantLock的!

👆🤓还有一件事,不懂的小朋友也不会伤心,也欢迎看看这个:

【Java并发】【ReentrantLock】适合初学体质的ReentrantLock入门

ArrayBlokcingQueue简单介绍

首先是我们的继承关系,简单看看就行了😄:

image.png

为了方便等下看源码,主播简单的画了下,ArrayBlockingQueue的结构,大概如下图所示:

image.png

i tems: 队列中的元素。

cout: 当前阻塞队列的元素数量。

lock: ReentrantLock锁。

putIndex: put方法、offer方法、add方法的下一个index

takeIndex: take方法、poll方法、peek方法、remove方法的下一个index

ArrayBlokcingQueue的初始化

你是否会好奇,这个items数组有多大?这个ReentrantLock什么时候赋值?

/** The queued items */
final Object[] items;

嘿嘿,聪明的你一定知道ArrayBlockingQueue是一个固定大小的数组。

没错ArrayBlockingQueue的数组items的大小,由构造的时候就确定了。

下面我们看看构造方法都干了什么:

public ArrayBlockingQueue(int capacity) {
   
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
   
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];        // items的大小为,我们入参传的大小
    lock = new ReentrantLock(fair);            // 初始化锁,公平/非公平由入参fair决定
    notEmpty = lock.newCondition();            // 初始化Condition
    notFull =  lock.newCondition();            // 初始化Condition
}

还有一种,一开始就传了数组初始值:

public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
   
    this(capacity, fair);

    // 这里加锁是为了防止并发问题,虽然items初始化出来了,但是下面要给items添加c集合的元素。
    // 避免其他线程添加元素导致线程安全问题。
    final ReentrantLock lock = this.lock;
    lock.lock(); 
    try {
   
        int i = 0;
        try {
   
            for (E e : c) {
   
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
   
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
   
        lock.unlock();
    }
}

生产者方法

put方法

众所周知,put方法会阻塞当前线程

// BlockingQueue#put
void put(E e) throws InterruptedException;

让我们看看ArrayBlockingQueue是如何实现这个方法的:

// ArrayBlockingQueue#put
public void put(E e) throws InterruptedException {
   
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 来来来,宝贝们看过来啦!!---  lock.lockInterruptibly()的使用场景
    lock.lockInterruptibly();
    try {
   
        // 这里又是什么?诶,对啦,await()的使用场景,之前学过ReentrantLock的小朋友是不是觉得很熟悉?
        // 阻塞队列满了,阻塞当前想要put元素的线程
        while (count == items.length)
            notFull.await(); 
        enqueue(e);
    } finally {
   
        lock.unlock();
    }
}

那如果当前阻塞队列不满,我们要干嘛?enqueue方法:

说说putIndex的变化逻辑:每次放入元素后,putIndex++,判断是否等于数组长度,如果是就重置为0。

// ArrayBlockingQueue#enqueue
private void enqueue(E x) {
   
    // 将元素放入阻塞队列中
    final Object[] items = this.items;
    items[putIndex] = x;
    // 如果putIndex等于数组的长度了,我们就将索引重置为0
    if (++putIndex == items.length)
        putIndex = 0;
    count++;            
    // signal队列不为空的信号,让消费者线程消费
    notEmpty.signal();    
}

offer方法

offer方法有两种,一种是只带元素,另一种带有时间参数的。

只带元素的offer方法:非阻塞的添加元素,如果队列满了,那就直接返回false,表示添加失败,正常添加元素成功的话,就返回true。

public boolean offer(E e) {
   
    // 放入的元素为空,就直接抛出空指针异常
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
   
        // 如果队列满了,就直接返回false
        if (count == items.length)
            return false;
        else {
   
            // 和上面同样的enqueu方法添加元素,然后返回true
            enqueue(e);
            return true;
        }
    } finally {
   
        lock.unlock();
    }
}

带时间参数的offer方法:timeout超时时间,unit时间单位。等待超时的话,就返回false,添加元素成功就返回true。

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
   
    // 元素为空直接抛空指针异常
    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        // 当前队列
        while (count == items.length) {
   
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        // 之后放入
        enqueue(e);
        return true;
    } finally {
   
        lock.unlock();
    }
}

为什么不建议使用add方法?

简单来说,就是ArrayBlockingQueue直接调用父类AbstractQueue的实,如果队列满了,会抛出异常:

// ArrayBlockingQueue#add
public boolean add(E e) {
   
    return super.add(e);
}

// 父类实现 AbstractQueue#add
public boolean add(E e) {
   
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");    // 队列满了,直接抛异常
}

消费者方法

take方法

阻塞获取队列中的元素,让我们来到方法的入口:

// BlockingQueue#take
E take() throws InterruptedException;

ArrayBlockingQueue对take的具体实现:

// ArrayBlockingQueue#take
public E take() throws InterruptedException {
   
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        // 当阻塞队列中没有元素,就阻塞该线程
        while (count == 0)
            notEmpty.await();
        return dequeue();    // 具体的取数据
    } finally {
   
        lock.unlock();
    }
}

具体的取数据方法,dequeue,简单来说就是:

  1. 返回最先进入队列的元素,并将items数组 takeIndex位置的元素设置为null
  2. 变更takeIndex(当前takeIndex等于items数组长度,就重置为0,不然就++)。
// ArrayBlockingQueue#dequeue
private E dequeue() {
   
    final Object[] items = this.items;
    // 获取元素
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // 变更takeIndex,当前takeIndex等于items数组长度,就重置为0,不然就++
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 修改总数量
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒生产者线程
    notFull.signal();
    return x;
}

poll方法

poll方法和和offer方法一样,有直接非阻塞获取的,也有阻塞一段获取的两种方法。

没有任何参数的普通poll方法,如果队列为空,直接就返回null,不为空,返回最先进入队列的元素:

// ArrayBlockingQueue#poll()
public E poll() {
   
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
   
        return (count == 0) ? null : dequeue();    // 获取队列中最先进的元素,上面说take方法的时候有细说
    } finally {
   
        lock.unlock();
    }
}

等待一段时间的poll方法,等待设置的时间:

  1. 队列中有元素,直接返回
  2. 队列中没有元素,判断是否超时
    • 没有超时,挂起线程等待
    • 超时,返回null
// ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        // 队列为空,判断是否超时,超时返回null,没有超时就挂起线程等待
        while (count == 0) {
   
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();    // 获取队列中最先进的元素,上面说take方法的时候有细说
    } finally {
   
        lock.unlock();
    }
}

为什么不建议使用remove方法?

因为 如果队列为空,直接抛出NoSuchElementException异常。

public E remove() {
   
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();    // 队列为空,抛出异常
}

Q:count数量的更新会有线程安全问题吗?

这里突然插一个问题,就是想看看各位对上面的源码的理解。

其实是线程安全的,因为我们对count的操作前,都是需要获取到锁的,单线程操作是没有线程安全的问题的。

// ArrayBlockingQueue#take
public E take() throws InterruptedException {
   
    final ReentrantLock lock = this.lock;    // 获取锁资源
    lock.lockInterruptibly();
    try {
   
        while (count == 0)
            notEmpty.await();
        return dequeue();    // 具体的取数据
    } finally {
   
        lock.unlock();
    }
}

// ArrayBlockingQueue#dequeue
private E dequeue() {
   
    ...
    count--;    // 修改count数量
....

后话

这篇,主播围绕着阻塞队列ArrayBlockingQueue的源码的角度,来看看如何阻塞的对队列进行操作。

比如阻塞、非阻塞、超时阻塞。

看完之后,聪明的你看完之后,也没有觉得ArrayBlockingQueue好像也就这样?没有关系,下一篇,主播将看LinkedBlockingQueue的源码,小手手点上关注,跟上主播的节奏!!!

目录
相关文章
|
26天前
|
前端开发 Java 关系型数据库
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
119 7
|
1月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
1月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
76 5
|
1月前
|
存储 Java
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
前言 下面,跟上主播的节奏,马上开始ThreadLocal源码的阅读( ̄▽ ̄)" 内部结构 如下图所示,我们可以知道,每个线程,都有自己的threadLocals字段,指向ThreadLocalMap
400 81
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
|
1月前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
242 70
|
8天前
|
JavaScript Java 关系型数据库
家政系统源码,java版本
这是一款基于SpringBoot后端框架、MySQL数据库及Uniapp移动端开发的家政预约上门服务系统。
家政系统源码,java版本
|
19天前
|
供应链 JavaScript 前端开发
Java基于SaaS模式多租户ERP系统源码
ERP,全称 Enterprise Resource Planning 即企业资源计划。是一种集成化的管理软件系统,它通过信息技术手段,将企业的各个业务流程和资源管理进行整合,以提高企业的运营效率和管理水平,它是一种先进的企业管理理念和信息化管理系统。 适用于小微企业的 SaaS模式多租户ERP管理系统, 采用最新的技术栈开发, 让企业简单上云。专注于小微企业的应用需求,如企业基本的进销存、询价,报价, 采购、销售、MRP生产制造、品质管理、仓库库存管理、财务应收付款, OA办公单据、CRM等。
100 23
|
1月前
|
存储 安全 Java
【Java并发】【原子类】适合初学体质的原子类入门
什么是CAS? 说到原子类,首先就要说到CAS: CAS(Compare and Swap) 是一种无锁的原子操作,用于实现多线程环境下的安全数据更新。 CAS(Compare and Swap) 的
65 15
【Java并发】【原子类】适合初学体质的原子类入门
|
1月前
|
Java
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
因为本文说的是ReentrantLock源码,因此会默认,大家对AQS有基本的了解(比如同步队列、条件队列大概> 长啥样?)。 不懂AQS的小朋友们,你们好呀!也欢迎先看看这篇
81 13
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
|
1月前
|
Java
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
本文深入解析了ConcurrentHashMap的实现原理,涵盖JDK 7与JDK 8的区别、静态代码块、构造方法、put/get/remove核心方法等。JDK 8通过Node数组+链表/红黑树结构优化并发性能,采用CAS和synchronized实现高效锁机制。文章还详细讲解了hash计算、表初始化、扩容协助及计数更新等关键环节,帮助读者全面掌握ConcurrentHashMap的工作机制。
72 6
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap

热门文章

最新文章