JAVA concurrency -- 阻塞队列ArrayBlockingQueue源码详解

简介:

JAVA concurrency -- 阻塞队列ArrayBlockingQueue源码详解

概述
ArrayBlockingQueue顾名思义,使用数组实现的阻塞队列。今天我们就来详细讲述下他的代码实现

阻塞队列
什么是阻塞队列?

阻塞队列是一种特殊的队列,使用场景为并发环境下。在某种情况下(当线程无法获取锁的时候)线程会被挂起并且在队列中等待,如果条件具备(锁被释放)那么就会唤醒挂起的线程。

通俗点来讲的话,阻塞队列类似于理发店的等待区,当没有理发师空闲的时候,客人会在等待区等待,一旦有了空闲,就会有人自动递补。

类的继承关系

ArrayBlockingQueue继承了抽象队列,并且实现了阻塞队列,因此它具备队列的所有基本特性。

基本实现原理
ArrayBlockingQueue的实现是基于ReentrantLock以及AQS内部实现的锁机制以及Condition机制。 ArrayBlockingQueue内部声明了两个Condition变量,一个叫notEmpty,一个叫notFull,当有数据加入队列时尝试唤醒notEmpty,当有数据移除队列时则唤醒notFull,从而实现一个类似于生产者消费者模型的机制。

源码分析
类成员变量

// 队列的存储对象数组
final Object[] items;

// 下一个取出的序号
int takeIndex;

// 下一个放入队列的序号
int putIndex;

// 队列中的元素数目
int count;

// 锁以及用来控制队列的两个条件变量
final ReentrantLock lock;

private final Condition notEmpty;

private final Condition notFull;

transient Itrs itrs = null;

构造函数

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

// 通用的构造函数,以容量和是否公平锁为参数,余下两个构造函数均调用此函数
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    // 调用构造函数
    this(capacity, fair);

    // 为阻塞队列初始化数据(此操作需要上锁)
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = 0;
        try {
            // 将集合中的数据存放到数组中并且进行判空操作
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        // 修改count和putIndex的值
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

这里有一点疑问,这里明明是构造函数,是类初始化的地方,照理来说不会产生竞争,为什么要进行加锁操作呢?此处原本有一句原版的注释 Lock only for visibility, not mutual exclusion 锁是为了可见性而不是互斥。这句话怎么理解呢?我们仔细观察代码,发现当我们把集合中的数据全部插入队列中之后,我们会修改相应的count以及putIndex的数值,但是如果我们没有加锁,那么在集合插入完成前count以及putIndex没有完成初始化操作的时候如果有其他线程进行了插入等操作的话,会造成数据同步问题从而使得数据不准确,因此这里的锁是必要的。

队列操作
基础队列操作enqueue和dequeue

// 队列的插入操作
private void enqueue(E x) {
    // 本地声明一个item数组的引用
    final Object[] items = this.items;
    // 将元素放入数组中
    items[putIndex] = x;
    // 如果此时已经到了数组的末尾了,将putIndex重置为0
    if (++putIndex == items.length)
        putIndex = 0;
    // 元素数目加1
    count++;
    // 发出通知告诉所有取数据的线程可以取数据
    notEmpty.signal();
}

// 队列的移除操作
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 找到要移除的数据置空
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // 如果此时已经到了数组的末尾了,将takeIndex重置为0
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 元素数目减1
    count--;
    // 迭代器操作,这个之后再说
    if (itrs != null)
        itrs.elementDequeued();
    // 发出通知告知插入线程可以工作
    notFull.signal();
    return x;
}

这两个方法是队列操作的基本方法,基本上就是常规的数组数据插入移除,只是有一点很让人困惑 final Object[] items = this.items; 这段代码实现将类成员对象在本地创建了一个引用,然后在本地使用引用进行操作,为什么要多此一举呢?除此之外,代码中大量用到了这种手法,例如: final ReentrantLock lock = this.lock; 这又是为了什么呢?对此笔者猜测可能是和优化相关,因为jdk7中的实现与之不同,是使用的类变量直接操作。在进行了资料查阅后,笔者找到了一个相对靠谱的解释:

这是ArrayBlockingQueue的作者Doug Lea的习惯,他认为这种书写习惯是对机器更加友好的书写

当然也有一些大神有一些其他的解释:

final本身是不可变的,但是由于反射以及序列化操作的存在,final的不可变性就变得捉摸不定,除此之外一些编译器层面上在final上优化的不够好,导致会在使用到数据的时候反复重载导致缓存失效

希望大家可以自己认真思考下,然后尝试下,得到自己的结论。

阻塞队列的插入操作

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果阻塞队列已满,那么插入失败
        if (count == items.length)
            return false;
        else {
            // 否则插入成功
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

阻塞队列插入操作大致就以上几种,这几种的区别在代码中也体现得比较清楚了:

offer返回的是布尔值,插入成功返回true否则(队列已满)返回false
put没有返回值,假如队列是满的,他会一直阻塞直到队列为空的时候执行插入操作
add实际上调用的就是offer,只是他在加入失败后会抛出异常
阻塞队列的移除操作

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex);
    } finally {
        lock.unlock();
    }
}

poll执行成功会返回队列元素,如果队列为空则直接返回null
take执行成功会返回队列元素,但是如果队列为空他不会返回而是等待有数据插入,然后取出
peek则是直接获取队列元素,并且执行后不会将元素从队列中删除
迭代器实现
由于迭代器和内部队列共享数据,再加上阻塞队列的特性,导致为了实现迭代器功能,需要新增一些很复杂的代码实现。

内部声明了两个类来实现迭代器,一个是Itr继承Iterator,一个则是Itrs。

Itrs
Itrs是用来管理迭代器的。由于阻塞队列内部可能会有多个迭代器在同时工作,在迭代器内部发生删除或者是一些不常见的操作时可能会产生一些问题,比如他们会丢失自己的数据之类的。所以Itrs内部会维护一个变量用于记录循环的圈数,并且在删除操作removeAt的时候会通知所有的迭代器。

class Itrs {
    // 创建一个Node类作为单向链表(节点是弱引用)来管理迭代器
    private class Node extends WeakReference<Itr> {
        Node next;

        Node(Itr iterator, Node next) {
            super(iterator);
            this.next = next;
        }
    }

    // 循环圈数
    int cycles = 0;

    // 链表头
    private Node head;

    // 清理相关的变量
    private Node sweeper = null;

    private static final int SHORT_SWEEP_PROBES = 4;
    private static final int LONG_SWEEP_PROBES = 16;

    Itrs(Itr initial) {
        register(initial);
    }

    // 清理无效的迭代器(如果sweeper为空,则从头开始,否则从sweeper记录的节点开始)
    void doSomeSweeping(boolean tryHarder) {
        
    }

    // 新增加一个迭代器
    void register(Itr itr) {
        head = new Node(itr, head);
    }

    // 当takeIndex为0时调用此方法
    void takeIndexWrapped() {
        // cycle数+1,内部实现通知所有迭代器并进行清理(链表遍历)
    }

    // 有移除操作的时候调用此方法,并通知所有迭代器进行清理
    void removedAt(int removedIndex) {
        // 简单的链表遍历,内部调用Itr的removedAt方法
    }

    // 当发现队列为空的时候调用此方法,清理迭代器内的弱引用
    void queueIsEmpty() {
        
    }

    // 有元素被取时是调用
    void elementDequeued() {
        // 如果数组为空调用queueIsEmpty进行清理
        if (count == 0)
            queueIsEmpty();
        // 如果takeIndex为0,调用takeIndexWrapped,来进行循环+1操作
        else if (takeIndex == 0)
            takeIndexWrapped();
    }
}

Itr
Itrs是管理迭代器的,Itr则是迭代器的具体实现

private class Itr implements Iterator<E> {
    // 游标,用于寻找下一个元素
    private int cursor;

    // 下一个元素
    private E nextItem;

    // 下一个元素的下标
    private int nextIndex;

    // 上一个元素
    private E lastItem;

    // 上一个元素的下标
    private int lastRet;

    // 上一个take的下标
    private int prevTakeIndex;

    // 上一个循环
    private int prevCycles;

    // 标记为空
    private static final int NONE = -1;

    // 删除标记
    private static final int REMOVED = -2;

    // DETACH标记专用于prevTakeIndex
    private static final int DETACHED = -3;

    Itr() {
        // 这是构造函数,内部实现主要是初始化为主,
        // 并且在Itrs不为空的时候进行一波清理操作
    }

    boolean isDetached() {
        return prevTakeIndex < 0;
    }

    private int incCursor(int index) {
        // 游标+1,并重新计算值(判断是否走完一个循环,是否等于putIndex)
        if (++index == items.length)
            index = 0;
        if (index == putIndex)
            index = NONE;
        return index;
    }

    // 判断给的删除数是否是有效值
    private boolean invalidated(int index, int prevTakeIndex,
                                long dequeues, int length) {
        
    }

    // 计算在迭代器的上一次操作后所有的删除(出队)操作
    private void incorporateDequeues() {
        // 主要方法为通过当前圈数和之前的圈数以及偏移量计算
        // 真实的删除数,并且和prevTakeIndex以及index的偏移量进行比较
    }

    // 进行detach操作并进行清理
    private void detach() {
        
    }

    // 判断是否有下一个节点
    public boolean hasNext() {
        
    }

    // 没有下一个节点(没有detach的节点将会被执行detach操作)
    private void noNext() {
        
    }

    // 找到下个节点
    public E next() {
        // 实现不复杂,主要是需要判断节点是否是detach模式
    }

    // 删除节点
    public void remove() {
        
    }

    // 当队列为空或者后续很难找到下个节点的时候通知迭代器
    void shutdown() {
        
    }

    // 辅助计算游标和prevTakeIndex之间的距离
    private int distance(int index, int prevTakeIndex, int length) {
        
    }

    // 删除节点
    boolean removedAt(int removedIndex) {
        
    }

    // 当takeIndex归0时调用
    boolean takeIndexWrapped() {
        
    }
}

总结
ArrayBlockingQueue的实现可以说是比较的简单清晰,主要是利用了ReentrantLock内部的Condition,通过设置两个条件来巧妙地完成阻塞队列的实现,只要能够理解这两个条件的工作原理,源码的理解就没有太大的难度。ArrayBlockingQueue较难理解的反而是它内部的迭代器,由于阻塞队列的特性,他的迭代器可能会有丢失当前数据的风险,因此,作者创作的时候加入了许多复杂的方法来保证可靠性,但是在这里由于篇幅限制,以及迭代器在阻塞队列中的地位和重要性并不高,所以简单讲述,如果有兴趣可以自己找一份源码阅读。

原文地址https://my.oschina.net/bjwzds/blog/3237769

相关文章
|
2天前
|
监控 安全 NoSQL
采用java+springboot+vue.js+uniapp开发的一整套云MES系统源码 MES制造管理系统源码
MES系统是一套具备实时管理能力,建立一个全面的、集成的、稳定的制造物流质量控制体系;对生产线、工艺、人员、品质、效率等多方位的监控、分析、改进,满足精细化、透明化、自动化、实时化、数据化、一体化管理,实现企业柔性化制造管理。
21 3
|
3天前
|
存储 Java
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
Java基础复习(DayThree):字符串基础与StringBuffer、StringBuilder源码研究
|
3天前
|
数据采集 监控 安全
java数字工厂MES系统全套源码Java+idea+springboot专业为企业提供智能制造MES解决方案
"MES" 指的是制造执行系统(Manufacturing Execution System)。MES在制造业中扮演着至关重要的角色,它是位于企业资源计划(ERP)系统和车间控制系统之间的系统,用于实时收集、管理、分析和报告与制造过程相关的数据。
10 0
|
3天前
|
移动开发 监控 供应链
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
一开始接触MES系统,很多人会和博主一样,对MES细节的应用不了解,这样很正常,因为MES系统相对于其他系统来讲应用比较多!
15 1
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
|
4天前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
4天前
|
存储 运维 Java
java云his系统源码一站式诊所SaaS系统Java版云HIS系统 八大特点
HIS系统采用面向技术架构的分析与设计方法,应用多层次应用体系架构设计,运用基于构件技术的系统搭建模式与基于组件模式的系统内核结构。通过建立统一接口标准,实现数据交换和集成共享,通过统一身份认证和授权控制,实现业务集成、界面集成。
29 1
|
5天前
|
Java 关系型数据库 MySQL
java+B/S架构医院绩效考核管理系统源码 医院绩效管理系统4大特点
医院绩效考核管理系统,采用多维度综合绩效考核的形式,针对院内实际情况分别对工作量、KPI指标、科研、教学、管理等进行全面考核。医院可结合实际需求,对考核方案中各维度进行灵活配置,对各维度的权重、衡量标准、数据统计方式进行自定义维护。
13 0
|
5天前
|
Java 数据挖掘 BI
Java医院绩效考核系统源码B/S+avue+MySQL助力医院实现精细化管理
医院绩效考核系统目标是实现对科室、病区财务指标、客户指标、流程指标、成长指标的全面考核、分析,并与奖金分配、学科建设水平评价挂钩。
32 0
|
5天前
|
数据采集 前端开发 Java
Java医院绩效考核系统源码maven+Visual Studio Code一体化人力资源saas平台系统源码
医院绩效解决方案包括医院绩效管理(BSC)、综合奖金核算(RBRVS),涵盖从绩效方案的咨询与定制、数据采集、绩效考核及反馈、绩效奖金核算到科到组、分配到员工个人全流程绩效管理;将医院、科室、医护人员利益绑定;全面激活人才活力;兼顾质量和效益、长期与短期利益;助力医院降本增效,持续改善、优化收入、成本结构。
19 0
|
5天前
|
监控 前端开发 Java
Java基于B/S医院绩效考核管理平台系统源码 医院智慧绩效管理系统源码
医院绩效考核系统是一个关键的管理工具,旨在评估和优化医院内部各部门、科室和员工的绩效。一个有效的绩效考核系统不仅能帮助医院实现其战略目标,还能提升医疗服务质量,增强患者满意度,并促进员工的专业成长
22 0