安全无忧:Java并发集合容器的应用与实践

简介: 安全无忧:Java并发集合容器的应用与实践

Java 常见并发容器

JDK 提供的这些容器大部分在 java.util.concurrent 包中:

  • ConcurrentHashMap : 线程安全的 HashMap
  • CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector
  • ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列
  • BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道,解决生产者消费者问题
  • ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找

ConcurrentHashMap

我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。

所以就有了 HashMap 的线程安全版本 ConcurrentHashMap。

实现原理

在 ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

JDK1.7

底层数据结构:Segments 数组 + HashEntry 数组 + 链表,采用分段锁保证安全性。

容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。

JDK1.8

底层数据结构:Synchronized + CAS + Node + 红黑树,Node 的 val 和 next 都用 volatile 保证,保证可见性。查找、替换、赋值操作都使用 CAS。

CAS 是乐观锁,在一些场景中(并发不激烈的情况下)它比 Synchronized 和 ReentrentLock 的效率要高,当 CAS 保障不了线程安全的情况下(扩容、hash 冲突)转成Synchronized 来保证线程安全,大大提高了低并发下的性能。

读取操作的实现
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get 操作全程无锁。get 操作可以无锁是由于 Node 元素的 val 和指针 next 是用 volatile 修饰的,在多线程环境下线程 A 修改节点的 val 或者新增节点的时候是对线程 B 可见的。

写入操作的实现
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 根据key的进行hash操作,找到Node数组中的位置
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 这边加了一个循环,就是不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
    // 因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 先判断Node数组有没有初始化,如果没有初始化先初始化initTable();
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 不存在hash冲突,即该位置是null,直接用CAS插入
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果tab[i]不为空并且hash值为MOVED(-1),说明该链表正在进行transfer操作,返回扩容完成后的table。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 存在hash冲突,对链表的头节点或者红黑树的头节点加synchronized锁,进一步减少线程冲突
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        // 如果是链表,就遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 如果key相同就执行覆盖操作
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 如果key不同就将元素插入到链表的尾部
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果是红黑树,就按照红黑树的结构进行插入。
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 链表长度大于8, Node数组的长度超过64时,会将链表的转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 计数增加1,有可能触发transfer操作(扩容)。
    addCount(1L, binCount);
    return null;
}
  1. 根据 key 的进行 hash 操作,找到 Node 数组中的位置
  2. 然后加了一个死循环,意思是不断的尝试,因为在 table 的初始化和 casTabAt 用到了 compareAndSwapInt、compareAndSwapObject,因为如果其他线程正在修改table,那么尝试就会失败,所以要加一个 fo 循环,不断的尝试
  3. 之后判断 Node 数组有没有初始化,如果没有初始化先初始化 initTable
  4. 如果不存在 hash 冲突,即该位置是 null,直接用 CAS 插入,之后跳出循环
  5. 如果存在了冲突并且 hash 值为 MOVED(-1),说明该链表正在进行 transfer 操作,获取到扩容完成后的 table,进入下一次的循环
  6. 存在产生 hash 冲突,那么对链表的头节点或者红黑树的头节点加 synchronized 锁,进一步减少线程冲突。
  1. 如果是链表,就遍历链表,如果 key 相同就执行覆盖操作,key 不同就追加,之后跳出循环
  2. 如果是红黑树,就按照红黑树的结构进行插入,之后跳出循环
  1. 循环结束后判断链表长度是否大于8,当同时满足 Node 数组的长度超过 64 时,会将链表的转化为红黑树
  2. 最后计数增加 1,同时有可能触发 transfer 操作(扩容)

CopyOnWriteArrayList

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。

这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。

实现原理

CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

读取操作的实现

private transient volatile Object[] array;
public E get(int index) {
    return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}
final Object[] getArray() {
    return array;
}

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

写入操作的实现

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝新数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

CopyOnWriteArrayList 写入操作在添加元素的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。

ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue 这个队列使用链表作为其数据结构,它在高并发环境中性能表现得非常好。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 底层是通过 CAS 非阻塞算法来实现线程的。

阻塞队列

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列导致线程阻塞。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。

阻塞队列的相关方法如下:

  • 如果将队列当作线程管理工具来使用,将要用到 put 和 take 方法。
  • 当试图向满的队列中添加或从空的队列中移出元素时,add、remove 和 element 操作抛出异常。
  • 在一个多线程程序中, 队列会在任何时候空或满,因此,一定要使用 offer、poll 和 peek 方法作为替代。这些方法如果不能完成任务,只是给出一个错误提示而不会抛出异常。poll 和 peek 方法通过返回空来指示失败。因此,向这些队列中插入 null 值是非法的。

java.util.concurrent 包提供了阻塞队列的几个变种。下面主要介绍一下 3 个常见的实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue 在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数,则那么等待了最长时间的线程会优先得到处理。通常,公平性会降低性能,只有在确实非常需要时才使用它。

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

/**
 * 指定容量构造
 */
public ArrayBlockingQueue(int capacity) {
    // 不设置公平参数
    this(capacity, false);
}
/**
 * 指定容量和公平参数的构造,fair为true则表示公平,等待越长越优先
 * 底层通过公平锁实现
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
/**
 * 添加一个元素,如果队列满,则阻塞
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    // 可重入锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 如果队列满,则阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
/**
 * 移除并返回头元素,如果队列空,则阻塞
 */
public E take() throws InterruptedException {
    // 可重入锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果队列空,则阻塞
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足先进先出的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE

其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

LinkedBlockingDeque 相当于是 LinkedBlockingQueue 的变种,底层基于双向链表实现。

/**
 * 某种意义上的无界队列,容量是Integer.MAX_VALUE
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
/**
 * 指定容量构造,有界队列
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
/**
 * 添加一个元素,如果队列满,则阻塞
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * 移除并返回头元素,如果队列空,则阻塞
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,它不是一个先进先出的队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

PriorityBlockingQueue 底层基于数组实现,默认容量为 11。扩容机制:每次新增检测当前容量,已满的时候进行扩容,假设当前容量为 n,如果 n < 64,那么新容量是 2n+2,否则是 1.5n。

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

DelayQueue:基于 PriorityQueue 的一种特殊队列,里面的元素只有在其到期时才能从队列中取出。


相关文章
|
16小时前
|
运维 监控 安全
构建高效自动化运维系统:基于容器技术的持续集成与持续部署(CI/CD)实践
【5月更文挑战第14天】 随着DevOps文化的深入人心,持续集成与持续部署(CI/CD)已成为现代软件工程不可或缺的组成部分。本文将探讨如何利用容器技术,尤其是Docker和Kubernetes,构建一个高效、可扩展的自动化运维系统。通过深入分析CI/CD流程的关键组件,我们将讨论如何整合这些组件以实现代码从提交到生产环境的快速、无缝过渡。文章还将涉及监控、日志管理以及安全性策略等运维考量,为读者提供一个全面的自动化运维解决方案蓝图。
|
16小时前
|
Java 程序员 调度
Java中的多线程编程:从理论到实践
【5月更文挑战第14天】在现代计算机技术中,多线程编程是一个重要的概念。它允许多个线程并行执行,从而提高程序的运行效率。本文将从理论和实践两个角度深入探讨Java中的多线程编程,包括线程的基本概念、创建和控制线程的方法,以及如何处理线程同步和通信问题。
|
16小时前
|
Kubernetes 网络协议 Java
容器服务Kubernetes版产品使用合集之遇到报错"java.lang.NoClassDefFoundError"如何解决
容器服务Kubernetes版,作为阿里云提供的核心服务之一,旨在帮助企业及开发者高效管理和运行Kubernetes集群,实现应用的容器化与微服务化。以下是关于使用这些服务的一些建议和合集,涵盖基本操作、最佳实践、以及一些高级功能的使用方法。
11 2
|
16小时前
|
Java 测试技术
Java一分钟之-正则表达式在Java中的应用
【5月更文挑战第14天】正则表达式是Java中用于文本处理的强大力量,通过`java.util.regex`包支持。常见问题包括元字符的理解、边界匹配和贪婪/懒惰量词的使用。错误通常涉及未转义特殊字符、不完整模式或过度匹配。要避免这些问题,需学习实践、使用在线工具和测试调试。示例代码展示了如何验证邮箱地址。掌握正则表达式需要不断练习和调试。
14 2
|
16小时前
|
运维 监控 Devops
构建高效稳定的云基础设施:DevOps与容器化技术融合实践
【5月更文挑战第14天】 在当今快速迭代的软件发展环境中,传统的IT运维模式已难以满足业务需求。本文探讨了如何通过DevOps文化和容器化技术的融合来构建一个高效且稳定的云基础设施。文中不仅分析了DevOps的核心理念及其对于提升运维效率的影响,还深入剖析了容器化技术如Docker和Kubernetes在自动化部署、弹性伸缩及微服务架构中的关键作用。此外,文章还将分享一系列实践经验,帮助读者理解如何在实际工作中将DevOps与容器化技术有效结合,以支持业务的敏捷性和可靠性。
17 2
|
16小时前
|
Kubernetes Cloud Native 持续交付
构建高效稳定的云原生应用:容器编排与微服务治理实践
【5月更文挑战第14天】 随着企业数字化转型的深入,云原生技术以其弹性、敏捷和可扩展的特性成为现代应用开发的首选模式。本文将探讨如何通过容器编排工具如Kubernetes以及微服务架构的有效治理,构建和维护高效且稳定的云原生应用。我们将分析容器化技术的优势,并结合案例讨论在多云环境下实现持续集成、持续部署(CI/CD)的最佳实践,同时解决微服务带来的分布式复杂性问题。通过本文的阐述,读者将获得一套提升系统可靠性和业务连续性的策略框架。
5 0
|
16小时前
|
运维 Kubernetes Devops
构建高效稳定的云基础设施:DevOps与容器化技术融合实践
【5月更文挑战第14天】 随着云计算的普及和企业数字化转型的深入,构建一个高效、稳定且能快速响应市场变化的云基础设施已成为众多组织的技术战略核心。本文将探讨如何通过DevOps文化和容器化技术的结合,实现自动化运维流程,提升服务部署效率,确保系统的可扩展性和高可用性。我们还将分析面临的挑战及解决方案,并展示在实际案例中的应用成果。
6 0
|
16小时前
|
运维 Kubernetes 持续交付
构建高效自动化运维体系:基于容器技术的持续集成与部署实践
【5月更文挑战第13天】 在现代软件开发周期中,持续集成(CI)和持续部署(CD)已成为提升开发效率、保障产品质量的关键环节。随着云计算和微服务架构的普及,容器技术如Docker和Kubernetes为运维领域带来了革命性的变革。本文旨在探讨如何利用容器技术构建一个高效、可靠的自动化运维体系,实现从代码提交到产品发布的全过程自动化管理。通过深入分析容器化技术的核心原理,结合实际案例,我们将阐述如何优化持续集成流程、确保自动化测试的覆盖率、以及实现无缝的持续部署。
14 2
|
16小时前
|
Java
Java中的多线程编程:基础知识与实践
【5月更文挑战第13天】在计算机科学中,多线程是一种使得程序可以同时执行多个任务的技术。在Java语言中,多线程的实现主要依赖于java.lang.Thread类和java.lang.Runnable接口。本文将深入探讨Java中的多线程编程,包括其基本概念、实现方法以及一些常见的问题和解决方案。
|
16小时前
|
Java 编译器 开发者
Java一分钟之-Java注解的理解与应用
【5月更文挑战第12天】本文介绍了Java注解的基础知识和常见应用,包括定义、应用和解析注解。注解在编译检查、框架集成和代码生成等方面发挥重要作用。文章讨论了两个易错点:混淆保留策略和注解参数类型限制,并提供了避免策略。提醒开发者避免过度使用注解,以保持代码清晰。理解并恰当使用注解能提升代码质量。
13 3