基于数组的有界阻塞队列 —— ArrayBlockingQueue

简介: 在阅读完和 AQS 相关的锁以及同步辅助器之后,来一起阅读 JUC 下的和队列相关的源码。先从第一个开始:ArrayBlockingQueue。

网络异常,图片无法展示
|


前言


在阅读完和 AQS 相关的锁以及同步辅助器之后,来一起阅读 JUC 下的和队列相关的源码。先从第一个开始:ArrayBlockingQueue。


介绍


由数组支持的有界BlockingQueue阻塞队列。


这个队列的命令元素FIFO(先入先出)。 队列的头是元素一直在队列中时间最长。 队列的尾部是该元素已经在队列中的时间最短。 新元素插入到队列的尾部,并且队列检索操作获取在队列的头部元素。


这是一个典型的“有界缓冲区”,在其中一个固定大小的数组保持由生产者插入并受到消费者的提取的元素。 一旦创建,容量不能改变。 试图put 一个元素到一个满的队列将导致操作阻塞; 试图 take 从空队列一个元素将类似地阻塞。


此类支持订购等待生产者和消费者线程可选的公平政策。 默认情况下,这个顺序不能保证。 然而,队列公平设置为构建 true 保证线程以FIFO的顺序进行访问。 公平性通常会降低吞吐量,但减少了可变性和避免饥饿。


基本使用

public class ArrayBlockingQueueTest {
    private static final ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(10);
    private static final CountDownLatch LATCH = new CountDownLatch(2);
    public static void main(String[] args) {
        ExecutorService pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());
        pool.submit(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(1000L);
                    QUEUE.put("鸡蛋" + Thread.currentThread().getName());
                    System.out.println("put 放入元素");
                } catch (InterruptedException ignored) {
                }
            }
            LATCH.countDown();
        });
        pool.submit(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(500L);
                    String take = QUEUE.take();
                    System.out.println("take = " + take);
                } catch (InterruptedException ignored) {
                }
            }
            LATCH.countDown();
        });
        try {
            LATCH.await();
        } catch (InterruptedException ignored) {
        }
        pool.shutdown();
    }
}

demo 只是临时写的一个,很简单的版本。


问题疑问

  1. ArrayBlockingQueue 的实现原理是什么?
  2. 入队列和出队列方法之间的区别是什么?


源码分析


基本结构

网络异常,图片无法展示
|


参数介绍

/** 数组 - 存储队列中的元素 */
final Object[] items;
/** 下一个 take, poll, peek or remove 的索引 */
int takeIndex;
/** 下一个 put, offer, or add 的索引 */
int putIndex;
/** 队列中的元素数 */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** take 操作时是否等待 */
private final Condition notEmpty;
/** put 操作时是否等待 */
private final Condition notFull;


构造函数

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
// 指定容量,及是否公平
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
// 初始化的时候放入元素
public ArrayBlockingQueue(int capacity, boolean fair,
                            Collection<? extends E> c) {
    this(capacity, fair);
    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}


添加元素

public boolean add(E e) {
    return super.add(e);
}
// 父类的方法,其实调用的也是 offer
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
// 使用锁
public boolean offer(E e) {
    checkNotNull(e);
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
// 放入元素, 如果队列满了,则等待
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
  1. add 方法:调用的是父类 AbstractQueue 的 add 方法,内部调用的是 offer 方法,如果 offer 返回 false,则抛出异常。
  2. offer 方法:校验元素非空,加互斥锁,如果队列满了,则返回 false,如果队列未满,则调用 enqueue 方法,添加元素。
  3. put 方法:校验元素非空,加互斥锁,如果队列满了,则一直自旋等待,队列未满则调用 enqueue 方法,添加元素。


所以下面还是需要看一下 enqueue 方法:

// 只有在获取锁的时候才可以调用
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // putIndex 下一个 put, offer, or add 的索引
    // 对其进行赋值,然后进行 ++putIndex 操作
    items[putIndex] = x;
    // 如果等于长度,则指定为开始
    if (++putIndex == items.length)
        putIndex = 0;
    // 对元素数进行 ++
    count++;
    // 有元素入队列,唤醒在等待获取元素的线程
    notEmpty.signal();
}


获取元素

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}


通过源码可以看出:

  1. pool 和 take 都是从队列中获取元素,二者不同的是,当队列中没有元素时,poll 方法返回 null,而 take 方法会一直阻塞等待,直到从队列中获取到元素。
  2. poll 和 take 方法获取元素都是调用的 dequeue 方法。
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 获取元素并将元素置为 null
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // takeIndex 下一个 take, poll, peek or remove 的索引
    // 指向下一个元素,并且 元素数减少
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    // 更新迭代器状态
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒等待放入元素的线程
    notFull.signal();
    return x;
}


查看元素

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}


总结


Q&A


Q: ArrayBlockingQueue 的实现原理?

A: ArrayBlockingQueue 是基于数组实现的,内部使用 ReentrantLock 互斥锁,防止并发放置元素或者取出元素的冲突问题。


Q: 入队列和出队列方法之间的区别是什么?

方法 作用
add 添加元素,队列满了,添加失败抛出遗产
offer 添加元素, 队列满了,添加失败,返回 false
put 添加元素,队列满了,阻塞等待
poll 弹出元素,队列为空则返回 null
take 弹出元素,队列为空则等待队列中有元素
peek 查看队列中放入最早的一个元素


结束语


ArrayBlockingQueue 中使用了 ReentrantLock 互斥锁,在元素入队列和出队列的时候都进行了加锁,所以同时只会有一个线程进行入队列或者出队列,从而保证线程安全。

目录
相关文章
|
3月前
|
安全 Java 应用服务中间件
Spring Boot + Java 21:内存减少 60%,启动速度提高 30% — 零代码
通过调整三个JVM和Spring Boot配置开关,无需重写代码即可显著优化Java应用性能:内存减少60%,启动速度提升30%。适用于所有在JVM上运行API的生产团队,低成本实现高效能。
296 3
|
2月前
|
Arthas 监控 数据可视化
深入理解JVM《JVM监控与性能工具实战 - 系统的诊断工具》
掌握JVM监控与诊断工具是Java性能调优的关键。本文系统介绍jps、jstat、jmap、jstack等命令行工具,以及jconsole、VisualVM、JMC、Arthas、async-profiler等可视化与高级诊断工具,涵盖GC分析、内存泄漏定位、线程死锁检测及CPU热点追踪,助力开发者全面提升线上问题排查能力。(238字)
|
6月前
|
存储 运维 安全
OSS安全合规实战:金融行业敏感数据加密+KMS自动轮转策略(满足等保2.0三级要求)
金融行业OSS面临等保2.0、行业监管及数据泄露三重合规挑战,存在存储加密不足、密钥轮转滞后、访问控制不当等问题。本文提出分层加密架构,结合服务端KMS与客户端加密,设计自动密钥轮转机制,实现高性能与合规兼顾,并提供故障排查与成本优化方案,助力金融机构安全落地OSS应用。
292 1
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
1707 31
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
消息中间件 负载均衡 Kafka
Kafka分区分配策略大揭秘:RoundRobin、Range、Sticky,你真的了解它们吗?
【8月更文挑战第24天】Kafka是一款突出高吞吐量、可扩展性和数据持久性的分布式流处理平台。其核心特性之一是分区分配策略,对于实现系统的负载均衡和高可用性至关重要。Kafka支持三种主要的分区分配策略:RoundRobin(轮询)、Range(范围)和Sticky(粘性)。RoundRobin策略通过轮询方式均衡分配分区;Range策略根据主题分区数和消费者数量分配;而Sticky策略则在保持原有分配的基础上动态调整,以确保各消费者负载均衡。理解这些策略有助于优化Kafka性能并满足不同业务场景需求。
1167 59
|
JSON Java 定位技术
【Android App】GPS获取定位经纬度和根据经纬度获取详细地址讲解及实战(附源码和演示 超详细)
【Android App】GPS获取定位经纬度和根据经纬度获取详细地址讲解及实战(附源码和演示 超详细)
5004 1
|
程序员
简历竟然敢写精通并发编程,那你说说AQS为什么要用双向链表?
一位工作4年的程序员 , 简历上写了精通并发编程 , 并且还阅读过AQS( AbstractQueuedSynchronizer)的源码,然后面试官只问了这样一个问题:“AQS 为什么要采用双向链表结构”?,然后就垮了! 其实AQS 大家都不陌生,它是 J.U.C 包里面一个非常重要的线程同步器。今天,我给大家聊聊我的理解。
423 1
|
12月前
|
消息中间件 Java Kafka
MQ四兄弟:如何保证消息顺序性
在分布式系统中,消息队列(MQ)是确保组件间高效通信的关键。RabbitMQ、RocketMQ、Kafka和Pulsar通过不同机制保证消息顺序性:RabbitMQ依赖单一队列和消费者模式;RocketMQ使用MessageQueueSelector;Kafka基于Partition和Key;Pulsar通过分区主题和键路由。这些系统的核心思想是将相同特征的消息发送到同一队列或分区,并按先进先出原则消费,从而确保消息顺序性。
786 0
|
存储 安全 Java
深入探索Java并发编程:ArrayBlockingQueue详解
深入探索Java并发编程:ArrayBlockingQueue详解
|
Java 调度 开发者
揭秘Java并发包(JUC)的基石:AQS原理和应用
揭秘Java并发包(JUC)的基石:AQS原理和应用

热门文章

最新文章