并发编程之ConcurrentHashMap jdk1.7和1.8源码剖析(一)

简介: 并发编程之ConcurrentHashMap jdk1.7和1.8源码剖析

一、背景:

线程不安全的HashMap

   因为多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。

效率低下的HashTable容器

    HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。

二、应用场景

CHM适用于读者数量超过写者时,当写者数量大于等于读者时,CHM的性能是低于Hashtable和synchronized Map的。这是因为当锁住了整个Map时,读操作要等待对同一部分执行写操作的线程结束。CHM适用于做cache,在程序启动时初始化,之后可以被多个请求线程访问。正如Javadoc说明的那样,CHM是HashTable一个很好的替代,但要记住,CHM的比HashTable的同步性稍弱。

三、源码分析:

3.1 jdk1.7的源码

3.1.1锁分段技术

   HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。

ConcurrentHashMap的数据结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

3.1.2 ConcurrentHashMap的主要数据结构

   ConcurrentHashMap(1.7及之前)中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图可以看出之间的关系

ConcurrentHashMap的成员变量

    //初始的容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //初始的加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //初始的并发等级(下面会叙述作用)
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小的segment数量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大的segment数量
    static final int MAX_SEGMENTS = 1 << 16; 
    static final int RETRIES_BEFORE_LOCK = 2;
    /**
     * Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组
     * 每个Segment相当于一个子Hash表
     */
    final Segment<K,V>[] segments;
    /**
     * segmentMask和segmentShift主要是为了定位段
     */
    final int segmentMask;
    final int segmentShift;

Segment的结构

 static final class Segment<K,V> extends ReentrantLock implements Serializable {
        //volatile,这使得能够读取到最新的 table值而不需要同步
        transient volatile HashEntry<K,V>[] table;
        //count用来统计该段数据的个数
        transient int count;
        //modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变
        transient int modCount;
        //threashold用来表示需要进行rehash的界限值
        transient int threshold;
        //loadFactor表示负载因子。
        final float loadFactor;
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
           //略
        }
        private void rehash(HashEntry<K,V> node) {
          //略
        }
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
           //略
        }
        private void scanAndLock(Object key, int hash) {
          //略
        }
        final V remove(Object key, int hash, Object value) {
           //略
        }
        final boolean replace(K key, int hash, V oldValue, V newValue) {
            //略
        }
        final V replace(K key, int hash, V value) {
            //略
        }
        final void clear() {
            //略
        }
    }

HashEntry的结构

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
    }

3.1.3 hash槽的的个数

  为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。

3.1.4 定位操作:

    private Segment<K,V> segmentForHash(int h) {
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
    }

  既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。

再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。

默认情况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中, (hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,可以看到hash值没有发生冲突。

3.1.5remove(key)操作

    /**
     * 先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,
     * 只要它们所在的段不相同,它们就可以同时进行。
     */
    public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }

下面是Segment的remove方法实现:

        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> e = entryAt(tab, index);
               //pred用来记录待每次循环的前一个节点
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    //当找到了待删除及节点的位置
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                          //如果待删除节点的前节点为null,即待删除节点时链头节点,此时把第2 
                            //个结点设为头结点
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                              //如果有前节点,则待删除节点的前节点的next指向待删除节点的的下 
                                //一个节点,删除成功
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

3.1.5 get操作

public V get(Object key) {
    Segment<K,V> s; 
     HashEntry<K,V>[] tab;
     int h = hash(key);
     long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
     if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h))              << TSHIFT) + TBASE);e != null; e = e.next) {
                      K k;
                      if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                                  return e.value;
                     }
      }
      return null;
}

       get 逻辑比较简单:只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。

       由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。

       ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。

3.1.6 put操作

put操作是委托给段的put方法。下面是段的put方法:

 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
           // 1. 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry
            HashEntry<K,V> node = tryLock() ? null :
              // 尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用scanAndLockForPut()
                //自旋获取锁。
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        // 2. 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 
                        //    是否相等,相等则覆盖旧的 value
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    // 3. 为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

  由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置然后放在HashEntry数组里。

  • 是否需要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
  • 如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

3.1.7 containsKey和containsValue

public boolean containsKey(Object key) {
        Segment<K,V> s; // same as get() except no need for volatile value read
        HashEntry<K,V>[] tab;
        int h = hash(key);
        //根据key定位到segment,遍历HashEntry判断是否具有key
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return true;
            }
        }
        return false;
    }
    public boolean containsValue(Object value) {
        // Same idea as size()
        if (value == null)
            throw new NullPointerException();
        final Segment<K,V>[] segments = this.segments;
        boolean found = false;
        long last = 0;
        int retries = -1;
        try {
            outer: for (;;) {
              //如果自旋的次数超过RETRIES_BEFORE_LOCK,强制堆所有segments加锁
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                long hashSum = 0L;
                int sum = 0;
                //遍历Segment
                for (int j = 0; j < segments.length; ++j) {
                    HashEntry<K,V>[] tab;
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null && (tab = seg.table) != null) {
                        for (int i = 0 ; i < tab.length; i++) {
                            HashEntry<K,V> e;
                            //遍历HashEntry
                            for (e = entryAt(tab, i); e != null; e = e.next) {
                                V v = e.value;
                                if (v != null && value.equals(v)) {
                                    found = true;
                                    break outer;
                                }
                            }
                        }
                        sum += seg.modCount;
                    }
                }
                if (retries > 0 && sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return found;
    }


相关文章
|
6月前
|
安全 前端开发 Java
JDK源码级别彻底剖析JVM类加载机制
JDK源码级别彻底剖析JVM类加载机制
|
6月前
|
缓存 Dubbo Java
趁同事上厕所的时间,看完了 Dubbo SPI 的源码,瞬间觉得 JDK SPI 不香了
趁同事上厕所的时间,看完了 Dubbo SPI 的源码,瞬间觉得 JDK SPI 不香了
|
2月前
|
监控 Java 开发者
【并发编程的终极简化】JDK 22结构化并发:让并发编程变得像写代码一样简单!
【9月更文挑战第8天】随着JDK 22的发布,结构化并发为Java编程带来了全新的并发编程体验。它不仅简化了并发编程的复杂性,提高了程序的可靠性和可观察性,还为开发者们提供了更加高效、简单的并发编程方式。我们相信,在未来的发展中,结构化并发将成为Java并发编程的主流方式之一,推动Java编程语言的进一步发展。让我们共同期待Java在并发编程领域的更多创新和突破!
|
3月前
|
算法 安全 Java
深入JDK源码:揭开ConcurrentHashMap底层结构的神秘面纱
【8月更文挑战第24天】`ConcurrentHashMap`是Java并发编程中不可或缺的线程安全哈希表实现。它通过精巧的锁机制和无锁算法显著提升了并发性能。本文首先介绍了早期版本中使用的“段”结构,每个段是一个带有独立锁的小型哈希表,能够减少线程间竞争并支持动态扩容以应对高并发场景。随后探讨了JDK 8的重大改进:取消段的概念,采用更细粒度的锁控制,并引入`Node`等内部类以及CAS操作,有效解决了哈希冲突并实现了高性能的并发访问。这些设计使得`ConcurrentHashMap`成为构建高效多线程应用的强大工具。
53 2
|
5月前
|
Java Spring
深入解析Spring源码,揭示JDK动态代理的工作原理。
深入解析Spring源码,揭示JDK动态代理的工作原理。
59 0
|
6月前
|
设计模式 Java
根据JDK源码Calendar来看工厂模式和建造者模式
根据JDK源码Calendar来看工厂模式和建造者模式
|
6月前
|
算法 Java 索引
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
67 0
|
6月前
|
Java Linux iOS开发
Spring5源码(27)-静态代理模式和JDK、CGLIB动态代理
Spring5源码(27)-静态代理模式和JDK、CGLIB动态代理
48 0
|
6月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
127 1
|
6月前
|
算法 安全 Java
ConcurrentLinkedQueue的源码解析(基于JDK1.8)
ConcurrentLinkedQueue的源码解析(基于JDK1.8) ConcurrentLinkedQueue是Java集合框架中的一种线程安全的队列,它是通过CAS(Compare and Swap)算法实现的并发队列。在并发场景下,ConcurrentLinkedQueue能够保证队列的线程安全性,同时性能也很不错。