安全无忧:Java并发集合容器的应用与实践

简介: 安全无忧:Java并发集合容器的应用与实践

Java 常见并发容器

JDK 提供的这些容器大部分在 java.util.concurrent 包中:

  • ConcurrentHashMap : 线程安全的 HashMap
  • CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector
  • ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列
  • BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道,解决生产者消费者问题
  • ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找

ConcurrentHashMap

我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。

所以就有了 HashMap 的线程安全版本 ConcurrentHashMap。

实现原理

在 ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

JDK1.7

底层数据结构:Segments 数组 + HashEntry 数组 + 链表,采用分段锁保证安全性。

容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。

JDK1.8

底层数据结构:Synchronized + CAS + Node + 红黑树,Node 的 val 和 next 都用 volatile 保证,保证可见性。查找、替换、赋值操作都使用 CAS。

CAS 是乐观锁,在一些场景中(并发不激烈的情况下)它比 Synchronized 和 ReentrentLock 的效率要高,当 CAS 保障不了线程安全的情况下(扩容、hash 冲突)转成Synchronized 来保证线程安全,大大提高了低并发下的性能。

读取操作的实现
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get 操作全程无锁。get 操作可以无锁是由于 Node 元素的 val 和指针 next 是用 volatile 修饰的,在多线程环境下线程 A 修改节点的 val 或者新增节点的时候是对线程 B 可见的。

写入操作的实现
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 根据key的进行hash操作,找到Node数组中的位置
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 这边加了一个循环,就是不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
    // 因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 先判断Node数组有没有初始化,如果没有初始化先初始化initTable();
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 不存在hash冲突,即该位置是null,直接用CAS插入
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果tab[i]不为空并且hash值为MOVED(-1),说明该链表正在进行transfer操作,返回扩容完成后的table。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 存在hash冲突,对链表的头节点或者红黑树的头节点加synchronized锁,进一步减少线程冲突
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        // 如果是链表,就遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 如果key相同就执行覆盖操作
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 如果key不同就将元素插入到链表的尾部
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果是红黑树,就按照红黑树的结构进行插入。
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 链表长度大于8, Node数组的长度超过64时,会将链表的转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 计数增加1,有可能触发transfer操作(扩容)。
    addCount(1L, binCount);
    return null;
}
  1. 根据 key 的进行 hash 操作,找到 Node 数组中的位置
  2. 然后加了一个死循环,意思是不断的尝试,因为在 table 的初始化和 casTabAt 用到了 compareAndSwapInt、compareAndSwapObject,因为如果其他线程正在修改table,那么尝试就会失败,所以要加一个 fo 循环,不断的尝试
  3. 之后判断 Node 数组有没有初始化,如果没有初始化先初始化 initTable
  4. 如果不存在 hash 冲突,即该位置是 null,直接用 CAS 插入,之后跳出循环
  5. 如果存在了冲突并且 hash 值为 MOVED(-1),说明该链表正在进行 transfer 操作,获取到扩容完成后的 table,进入下一次的循环
  6. 存在产生 hash 冲突,那么对链表的头节点或者红黑树的头节点加 synchronized 锁,进一步减少线程冲突。
  1. 如果是链表,就遍历链表,如果 key 相同就执行覆盖操作,key 不同就追加,之后跳出循环
  2. 如果是红黑树,就按照红黑树的结构进行插入,之后跳出循环
  1. 循环结束后判断链表长度是否大于8,当同时满足 Node 数组的长度超过 64 时,会将链表的转化为红黑树
  2. 最后计数增加 1,同时有可能触发 transfer 操作(扩容)

CopyOnWriteArrayList

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。

这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。

实现原理

CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

读取操作的实现

private transient volatile Object[] array;
public E get(int index) {
    return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}
final Object[] getArray() {
    return array;
}

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

写入操作的实现

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝新数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

CopyOnWriteArrayList 写入操作在添加元素的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。

ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue 这个队列使用链表作为其数据结构,它在高并发环境中性能表现得非常好。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 底层是通过 CAS 非阻塞算法来实现线程的。

阻塞队列

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列导致线程阻塞。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。

阻塞队列的相关方法如下:

  • 如果将队列当作线程管理工具来使用,将要用到 put 和 take 方法。
  • 当试图向满的队列中添加或从空的队列中移出元素时,add、remove 和 element 操作抛出异常。
  • 在一个多线程程序中, 队列会在任何时候空或满,因此,一定要使用 offer、poll 和 peek 方法作为替代。这些方法如果不能完成任务,只是给出一个错误提示而不会抛出异常。poll 和 peek 方法通过返回空来指示失败。因此,向这些队列中插入 null 值是非法的。

java.util.concurrent 包提供了阻塞队列的几个变种。下面主要介绍一下 3 个常见的实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue 在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数,则那么等待了最长时间的线程会优先得到处理。通常,公平性会降低性能,只有在确实非常需要时才使用它。

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

/**
 * 指定容量构造
 */
public ArrayBlockingQueue(int capacity) {
    // 不设置公平参数
    this(capacity, false);
}
/**
 * 指定容量和公平参数的构造,fair为true则表示公平,等待越长越优先
 * 底层通过公平锁实现
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
/**
 * 添加一个元素,如果队列满,则阻塞
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    // 可重入锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 如果队列满,则阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
/**
 * 移除并返回头元素,如果队列空,则阻塞
 */
public E take() throws InterruptedException {
    // 可重入锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果队列空,则阻塞
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足先进先出的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE

其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

LinkedBlockingDeque 相当于是 LinkedBlockingQueue 的变种,底层基于双向链表实现。

/**
 * 某种意义上的无界队列,容量是Integer.MAX_VALUE
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
/**
 * 指定容量构造,有界队列
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
/**
 * 添加一个元素,如果队列满,则阻塞
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * 移除并返回头元素,如果队列空,则阻塞
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,它不是一个先进先出的队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

PriorityBlockingQueue 底层基于数组实现,默认容量为 11。扩容机制:每次新增检测当前容量,已满的时候进行扩容,假设当前容量为 n,如果 n < 64,那么新容量是 2n+2,否则是 1.5n。

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

DelayQueue:基于 PriorityQueue 的一种特殊队列,里面的元素只有在其到期时才能从队列中取出。


相关文章
|
4天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
23 2
|
14天前
|
存储 缓存 安全
Java 集合江湖:底层数据结构的大揭秘!
小米是一位热爱技术分享的程序员,本文详细解析了Java面试中常见的List、Set、Map的区别。不仅介绍了它们的基本特性和实现类,还深入探讨了各自的使用场景和面试技巧,帮助读者更好地理解和应对相关问题。
36 5
|
25天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
24天前
|
安全 Java 数据库连接
Java中的异常处理:理解与实践
在Java的世界里,异常处理是维护代码健壮性的守门人。本文将带你深入理解Java的异常机制,通过直观的例子展示如何优雅地处理错误和异常。我们将从基本的try-catch结构出发,探索更复杂的finally块、自定义异常类以及throw关键字的使用。文章旨在通过深入浅出的方式,帮助你构建一个更加稳定和可靠的应用程序。
31 5
|
27天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
115 6
|
24天前
|
安全 Java 程序员
Java内存模型的深入理解与实践
本文旨在深入探讨Java内存模型(JMM)的核心概念,包括原子性、可见性和有序性,并通过实例代码分析这些特性在实际编程中的应用。我们将从理论到实践,逐步揭示JMM在多线程编程中的重要性和复杂性,帮助读者构建更加健壮的并发程序。
|
25天前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
29 2
|
27天前
|
存储 缓存 安全
Java 集合框架优化:从基础到高级应用
《Java集合框架优化:从基础到高级应用》深入解析Java集合框架的核心原理与优化技巧,涵盖列表、集合、映射等常用数据结构,结合实际案例,指导开发者高效使用和优化Java集合。
39 4
|
27天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
61 1
|
28天前
|
SQL 安全 Java
Java 异常处理:筑牢程序稳定性的 “安全网”
本文深入探讨Java异常处理,涵盖异常的基础分类、处理机制及最佳实践。从`Error`与`Exception`的区分,到`try-catch-finally`和`throws`的运用,再到自定义异常的设计,全面解析如何有效管理程序中的异常情况,提升代码的健壮性和可维护性。通过实例代码,帮助开发者掌握异常处理技巧,确保程序稳定运行。
40 0

热门文章

最新文章