java并发面试常识之ArrayBlockingQueue

简介: ArrayBlockingQueue是常用的线程集合,在线程池中也常常被当做任务队列来使用。使用频率特别高。他是维护的是一个循环队列(基于数组实现),循环结构在数据结构中比较常见,但是在源码实现中还是比较少见的。

ArrayBlockingQueue是常用的线程集合,在线程池中也常常被当做任务队列来使用。使用频率特别高。他是维护的是一个循环队列(基于数组实现),循环结构在数据结构中比较常见,但是在源码实现中还是比较少见的。

线程安全的实现

线程安全队列,基本是离不开锁的。ArrayBlockingQueue使用的是ReentrantLock,配合两种Condition,实现了集合的线程安全操作。这里稍微说一个好习惯,下面是成员变量的声明。


    private static final long serialVersionUID = -817911632652898426L;
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    transient Itrs itrs = null;

赋值的操作基本都是在构造函数里做的。这样有个好处,代码执行可控。成员变量的初始化也是会合并在构造方法里执行的,但是在执行顺序上需要好好斟酌,如果写在构造方法里初始化,则没有相关问题。

阻塞队列的常用场所就是生产者消费者。一般都是生产者放入,消费者从头取数据。下面重点说这两个操作。
这两个操作都是依靠锁来保证线程安全的。

生产操作

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

put等放入操作,首先是获取锁,如果发现数据满了,就通过notFull的condition,来阻塞线程。这里的条件判定一定是用while而不是if,多线程情况下,可以被唤醒后发现又满了。

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

这个是入队列的操作。首先获取维护的数组。putindex就是放入操作的标志。这个操作会一直加。达到预定的长度后就变成0从头开始计数。这样插入的操作就是一个循环的操作了,count就是用来做计数的,作为能否插入数据的一个标准,插入数据后就通过notEmpty的condition发出一个信号唤醒消费线程。

消费操作

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

  

消费的方法也是这样。先获取锁,然后进行条件判断,如果没有数据,则阻塞线程。注意点和put一样。

 private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

取数据的时候,也依靠takeIndex,这是一个标志,这个数值也会一直增加,表示取的第一个数据的位置。如果这个标志走到最后,然后变成0,从头再来。这样保证取出的数据都是fifo的顺序。删除的时候如果发现迭代中,则会修改迭代器的遍历。然后通过notFull的condition来唤醒生产线程。

移除操作

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

对于remove操作就比较麻烦了,首先获取锁之后,把两个标志位本地化,然后找到要删除的元素的位置。调用removeAt,这里删除需要对标志位做改变。

    void removeAt(final int removeIndex) {
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

如果删除的元素是位置和takeindex一样。那就可以直接删除,然后让删除标志位向后移动。如果不是,则从删除的位置开始,进行后面向前面的数据覆盖的操作。直到遇到putindex的前一个位置。然后把那个位置的数据设置为null。并且把putindex的位置往前移动一格,正在迭代的时候要删除数据并且唤醒生产线程。

目录
相关文章
|
21天前
|
机器学习/深度学习 人工智能 数据挖掘
人工智能面试常识-10
人工智能面试常识-10
36 0
|
21天前
|
Java 程序员
java线程池讲解面试
java线程池讲解面试
39 1
|
1天前
|
安全 Java
就只说 3 个 Java 面试题 —— 02
就只说 3 个 Java 面试题 —— 02
10 0
|
1天前
|
存储 安全 Java
就只说 3 个 Java 面试题
就只说 3 个 Java 面试题
7 0
|
3天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
3 0
|
3天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
21 0
|
11天前
|
Java 关系型数据库 MySQL
大厂面试题详解:Java抽象类与接口的概念及区别
字节跳动大厂面试题详解:Java抽象类与接口的概念及区别
34 0
|
11天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
19天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
20天前
|
存储 缓存 算法
Java入门高频考查基础知识4(字节跳动面试题18题2.5万字参考答案)
最重要的是保持自信和冷静。提前准备,并对自己的知识和经验有自信,这样您就能在面试中展现出最佳的表现。祝您面试顺利!Java 是一种广泛使用的面向对象编程语言,在软件开发领域有着重要的地位。Java 提供了丰富的库和强大的特性,适用于多种应用场景,包括企业应用、移动应用、嵌入式系统等。下是几个面试技巧:复习核心概念、熟悉常见问题、编码实践、项目经验准备、注意优缺点、积极参与互动、准备好问题问对方和知其所以然等,多准备最好轻松能举一反三。
46 0
Java入门高频考查基础知识4(字节跳动面试题18题2.5万字参考答案)