面试官: 阻塞队列的底层实现有了解过吗? 说说看

简介: 面试官: 阻塞队列的底层实现有了解过吗? 说说看

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~


本节以ArrayBlockingQueue为例, 带大家看下阻塞队列是如何实现,一起来看下吧~


ArrayBlockingQueue 源码分析

构造函数

同样的,我们先从它的构造函数看起

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
}
复制代码


  • capacity 固定容量大小
  • false,这个字段名称其实是fair 默认下它是false, 非公平锁


上节我们使用的就是它的默认用法,公平锁和非公平锁我们之前讲过,可以查看以往文章(ReentrantLock源码分析)。下面我们接着看:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
复制代码


从上面的代码来看,可知capacity > 0,第一个构造函数的this()其实就是调的这个构造函数,我们可以通过它来指定容量和访问策略(fair 和 nofair)ArrayBlockingQueue


再接着看最后一个构造函数

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        // 锁用于可见性
        lock.lock(); 
        try {
            int i = 0;
            try {
                // 迭代集合
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
                // 捕获异常 越界
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
}
复制代码

从代码来看,对比上一个多了个Collection,这是干嘛的呢?它允许我们在创建的时候初始化一个集合进去,按迭代顺序添加到容器,从它的内部我们也可以看出来


内部变量

// 队列的元素
    final Object[] items;
    // 获取下一个元素时的索引
    int takeIndex;
    // 下一个添加元素时的索引
    int putIndex;
   // 队列的元素数量
    int count;
    // 全局锁
    final ReentrantLock lock;
    // 等待条件
    private final Condition notEmpty;
    private final Condition notFull;
    // 迭代器的共享状态
    transient Itrs itrs = null;
复制代码


内部方法

add() & offer()

我们看下add方法,这个方法用于向队列中添加元素

public boolean add(E e) {
    return super.add(e);
}
复制代码


内部调用了父类的方法,它继承了AbstractQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {....}
复制代码


接着看AbstractQueueadd()

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
复制代码


可以看到内部调用了offer(), 如果添加成功就返回true,失败就抛出异常, 这符合我们上节使用时它的特点


但是,我们发现在它的内部并没有offer方法,所以实现不在AbstractQueue,实现还是在ArrayBlockingQueue


来看下ArrayBlockingQueueoffer()方法

public boolean offer(E e) {
        // 判断元素 e 是否为空,空抛出 NullPointerException 异常
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 需要持锁
        lock.lock();
        try {
            // 如果元素已满 返回false, 对标 add 就会抛出异常
            if (count == items.length)
                return false;
            else {
                // 添加到队列中
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
}
复制代码


看下enqueue:

private void enqueue(E x) {
        final Object[] items = this.items;
        // 将元素添加到预期的索引位置
        items[putIndex] = x;
        // 如果下个索引值等于容器数量值 将putIndex归0
        if (++putIndex == items.length)
            putIndex = 0;
        // 容器元素数量+1    
        count++;
        // 唤醒等待的线程
        notEmpty.signal();
    }
复制代码


remove & poll

先看第一个 remove(), 同样的这个方法存在于 AbstractQueue内部,如果被移除的元素为null则抛出异常

public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
}
复制代码


poll()的实现在ArrayBlockingQueue,内部实现方式跟add很像

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
复制代码


private E dequeue() {
        final Object[] items = this.items;
        // 这个注解用于忽略一些警告 这不是重点
        @SuppressWarnings("unchecked")
        // 取出元素 takeIndex 按照 FIFO
        E x = (E) items[takeIndex];
        // 元素取出时 置为空
        items[takeIndex] = null;
        // 判断下一个元素的位置
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒等待的线程    
        notFull.signal();
        // 返回元素
        return x;
    }
复制代码


第二个remove(e), 这个实现在ArrayBlockingQueue的内部,可以移除指定元素

public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    // 遍历移除指定元素
                    if (o.equals(items[i])) {
                        // 移除指定元素 并更新对应的索引位置
                        removeAt(i);
                        return true;
                    }
                    // 防止越界
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
复制代码


take

take会造成线程阻塞下面我看下它的内部实现

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获取锁,当线程中断会直接返回
        lock.lockInterruptibly();
        try {
            // 如果元素内部为空 会进入阻塞,意思是没有元素可拿了,进入等待
            while (count == 0)
                // 使当前线程等待
                notEmpty.await();
            // 否则出列    
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
复制代码


put

该方法实现跟 take类似, 也会阻塞线程

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 如果元素满了 就阻塞
            while (count == items.length)
                notFull.await();
            // 否则入列    
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
复制代码


结束语

本节主要给大家讲了ArrayBlockingQueue的源码实现,它的源码相对简单一些, 大家可以根据本节看下BlockingQueue其它的实现类。下一节, 带大家学习一下并发容器 ~

相关文章
|
8月前
|
安全
阻塞队列的安全实现,定时器的安全实现(面试易考),超详细(一)
阻塞队列的安全实现,定时器的安全实现(面试易考),超详细
|
8月前
|
安全
阻塞队列的安全实现,定时器的安全实现(面试易考),超详细(二)
阻塞队列的安全实现,定时器的安全实现(面试易考),超详细
|
1月前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
|
1月前
|
安全 Java
【面试问题】Java 中的阻塞队列用过哪些?
【1月更文挑战第27天】【面试问题】Java 中的阻塞队列用过哪些?
|
8月前
|
设计模式 存储 安全
面试易考:多线程模式下的单例模式两种具体实现(饿汉,懒汉),两个的线程安全性,阻塞队列,生产者消费者模型
面试易考:多线程模式下的单例模式两种具体实现(饿汉,懒汉),两个的线程安全性,阻塞队列,生产者消费者模型
|
10月前
|
存储 消息中间件 安全
第二季:7阻塞队列知道吗?【Java面试题】
第二季:7阻塞队列知道吗?【Java面试题】
34 0
|
Java API
2020大厂面试JUC线程重要技术点【集合+线程+阻塞队列+线程池】(下)
2020大厂面试JUC线程重要技术点【集合+线程+阻塞队列+线程池】(下)
110 0
2020大厂面试JUC线程重要技术点【集合+线程+阻塞队列+线程池】(下)
|
安全 Java
2020大厂面试JUC线程重要技术点【集合+线程+阻塞队列+线程池】(上)
2020大厂面试JUC线程重要技术点【集合+线程+阻塞队列+线程池】(上)
94 0
2020大厂面试JUC线程重要技术点【集合+线程+阻塞队列+线程池】(上)
|
存储 消息中间件 Java
详解java中一个面试常问的知识点-阻塞队列
学习数据结构的时候介绍过队列,今天介绍一种队列的其中一种,叫做阻塞队列。这个知识点属于多线程中的一个模块,对于我们理解消息中间件有份非常大的用处,希望对你有帮助。
142 0
详解java中一个面试常问的知识点-阻塞队列