如何保证容器是线程安全的? ConcurrentHashMap 如何高效的线程安全?

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器镜像服务 ACR,镜像仓库100个 不限时长
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 如何保证容器是线程安全的? ConcurrentHashMap 如何高效的线程安全?

如何保证容器是线程安全的?ConcurrentHashMap 如何高效的线程安全?



Java提供了不同层面的线程安全支持。在传统集合框架内部,除了 Hashtable等同步容器,还提供了所谓的同步包装器(Synchronized Wrapper),我们可以调用Collections工具类提供的包装方法,来获取一个同步的包装容器(如 Collections.synchronizedMap),但是它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。其实可以利用并发包提供的线程安全容器。


  • 各种并发容器,比如 ConcurrentHashMap、CopyOnWriteArrayList
  • 各种线程安全队列(Queue/Deque),比如 ArrayBlockingQueue,SynchronousQueue
  • 各种有序容器的线程安全版本。


如何保证线程安全


首先要保障线程安全的几个基本特性, 原子性,可见性,有序性。其次可以通过封装的方式将内部对象保护起来,保证变量对象的不可变性,一般就线程安全了。

  • 理解基本的线程安全工具
  • 理解传统集合矿建并发变成中 Map 存在的问题,清楚简单同步方式的不足
  • 梳理并发包内,尤其是 ConcurrentHashMap 采取了哪些方法来提高并发表现。
  • 最好能够掌握 ConcurrentHashMap 自身的演进,目前很多分析资料还是基于早期版本。


为什么需要 ConcurrentHashMap


Hashtable 是怎样实现线程安全的。


Hashtable 能够保证线程安全,但是它的基本就是将 put ,get ,size 等各种操作加上 synchronized, 这样就导致了所有并发操作都要竞争一把锁,一个线程在进行同步操作时,其他线程只能等待,大大减低了并发效率。


@SuppressWarnings("unchecked")
public synchronized V get(Object key) {
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
        if ((e.hash == hash) && e.key.equals(key)) {
            return (V)e.value;
        }
    }
    return null;
}
public synchronized V put(K key, V value) {
    // Make sure the value is not null
    if (value == null) {
        throw new NullPointerException();
    }
    // Makes sure the key is not already in the hashtable.
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    @SuppressWarnings("unchecked")
    Entry<K,V> entry = (Entry<K,V>)tab[index];
    for(; entry != null ; entry = entry.next) {
        if ((entry.hash == hash) && entry.key.equals(key)) {
            V old = entry.value;
            entry.value = value;
            return old;
        }
    }
    addEntry(hash, key, value, index);
    return null;
}

SynchronizedMap 是如何实现线程安全的?


SynchronizedMap 并没有声明 synchronized 方法,但是还是利用了互斥的 mutex ,相对于 hashtable 没有真正意义上的改进。


private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
        private static final long serialVersionUID = 1978198479659022715L;
        private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize
        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }
        SynchronizedMap(Map<K,V> m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }
        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }
        public boolean containsKey(Object key) {
            synchronized (mutex) {return m.containsKey(key);}
        }
        public boolean containsValue(Object value) {
            synchronized (mutex) {return m.containsValue(value);}
        }
        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }
        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }
        public void putAll(Map<? extends K, ? extends V> map) {
            synchronized (mutex) {m.putAll(map);}
        }
        public void clear() {
            synchronized (mutex) {m.clear();}
        }
        private transient Set<K> keySet;
        private transient Set<Map.Entry<K,V>> entrySet;
        private transient Collection<V> values;
        public Set<K> keySet() {
            synchronized (mutex) {
                if (keySet==null)
                    keySet = new SynchronizedSet<>(m.keySet(), mutex);
                return keySet;
            }
        }
        public Set<Map.Entry<K,V>> entrySet() {
            synchronized (mutex) {
                if (entrySet==null)
                    entrySet = new SynchronizedSet<>(m.entrySet(), mutex);
                return entrySet;
            }
        }
}

hashtable 或者同步包装版本,都只适合在非高度并发的场景下。


ConcurrentHashMap 是如何设计实现的?


ConcurrentHashMap 为什么能够大大提高并发效率?ConcurrentHashMap 的设计一直在演化,比如在 Java 8 中就发生发生了很大变化。

  • 分离锁,也就是将内部进行分段( Segment),里面则是 HashEntry的数组,和 HashMap类似,哈希相同的条目也是以链表形式存放。
  • HashEntry 内部使用 volatile的 value 字段来保证可见性,也利用了不可变对象的机制以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe中的很多操作都是 VM intrinsic优化过的。

早期的 ConcurrentHashMap 实现


640.png

JDK 7 ConcurrentHashMap 是如何 get 的?


public v get(object key){
  Segment<k, V> s ;// manually integrate access methods to reduce overhead
  HashEntry<k, v>[] tab:
  int h= hash(key.hashcode());
  //利用操作替换普迅数字运算
  long u = (((h >> segmentshift)& segmentMask)<< SSHIFT)+ SBASE
  //以 segment为单位,进行定位
  //利用 insafe直接 Volatile access
  if((s =(Segment<k, V>)UNSAFE.getobjectvolatile(segments, u)) != null &&(tab = s.table) != null){
  }
  // 略
  return null;
}

JDK 7 ConcurrentHashMap 是如何 put 的?


public v put(K key, v value){
    segment<K, v> 5;
    if(value =s null)
      throw new NullPointerException()
    //二次哈希,以保证数分散性,避免哈希突
    int hash = hash(key.hashcode());
    int j =(hash >>>segmentshift)& segnentMask ;
    if((s =(Segment<k, V>) UNSAFE.getobject
    // nonvolatile:recheck
    (segments, (j < SSHIFT)+ SBASE))= null)// in ensuresegm     return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // 无论如何,确保获取锁
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            // 更新已有的value
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                    // 放着hashEntry 到特定位置,如果超过阀值,进行 rehash
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

JDK 7 实现的ConcurrentHashMap 是居于分段锁技术实现的


  • ConcurrentHashMap会获取再入锁,以保证数据一致性, Segment本身就是基于 ReentrantLock的扩展实现,所以,在并发修改期间,相应 Segment是被锁定的
  • 在最初阶段,进行重复性的扫描,以确定相应key值是否已经在数组里面,进而决定是更新还是放置操作,你可以在代码里看到相应的注释。重复扫描、检测冲突是 ConcurrentHash Map的常见技巧我在专栏上一讲介绍 HashMap时,提到了可能发生的扩容问题,在 ConcurrentHashMap 中同样存在。不过有一个明显区别,就是它进行的不是整体的扩容,而是单独对 Segmen进行扩容,细节就不介绍了。
  • 另外一个Map的size方法同样需要关注,它的实现涉及分离锁的一个副作用。试想,如果不进行同步,简单的计算所有 Segment的总值,可能会因为并发put,导致结果不准确,但是直接锁定所有 Segment进行计算,就会变得非常昂贵。其实,分离锁也限制了Map的初始化等操作。


JDK 7  ConcurrentHashMap 是如何获得 size 的


public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                // 实现重试机制 ,试图获得可靠值,如果监控到发生变化,就直接返回,否则就获取锁进行操作,
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                // 监控是否有变化
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

ConcurrentHashMap的实现是通过重试机制( RETRIES_ BEFORE_LOCK,指定重试次数2),来试图获得可靠值。如果没有监控到发生变化(通过对比 Segment.modCount),就直接返回,否则获取锁进行操作。


Java 8 之后的版本 ConcurrentHash 发生了哪些变化?


  • 总体结构上,它的内部存储变得和我在专栏上一讲介绍的 HashMap结构非常相似,同样是大的桶( bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要细致一些,还是数组+链表结构。
  • 其内部仍然有 Segment定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。
  • 因为不再使用 Segment,初始化操作大大简化,修改为lazy-oad形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点
  • 数据存储利用 volatile来保证可见性。
  • 使用 CAS 等操作,在特定场景进行无锁并发操作
  • 使用 Unsafe、 LongAdder之类底层手段,进行极端情况的优化。


JDK 8 数据存储内部实现


我们可以发现Key是final的,因为在生命周期中,一个条目的Key发生变化是不可能的;与此同时value,则声明为 volatile,以保证可见性。

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

JDK 8 ConcurrentHashMap 是怎么 get 的?


实现相对简单,先找到哪个节点,然后,在链表中遍历查找。


public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

JDK 8 ConcurrentHashMap 是怎么 put 的 ?


public V put(K key, V value) {
        return putVal(key, value, false);
    }
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
               // 利用CAS 去进行无锁线程安全操作 如果 bin 是空的
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    // 细粒度的同步修改操作
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //  Bin 链表超过阀值,进行树化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

可以看到在同步逻辑上,它使用的是 synchronized ,不是 JDK 7 的 ReentrantLock 之类,为什么?JDK 对 synchronized 进行了不断优化,不在需要过分担心性能差异,相对于 ReentrantLock,可以减少内存消耗,是个非常大的优势。


JDK 8 ConcurentHashMap 是怎么进行 inittable 的?


private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

这里用的是一个典型的 CAS 使用场景,利用 volatile 的 sizectl 作为互斥手段,如果发现竞争性的初始化,那么spin, 等待条件恢复,否则利用 CAS 设置排他标准时,如果成功那么初始化,否则重试。while循环就是这个功效,


JDK 8 ConcurrentHashMap size() 方法


public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
  final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
 static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
  }


计数思路还是一样的,都是分而治之进行计数,然后求和处理。可以看到 CounterCell 的操作 ,是基于 java.util.concurrent.atomic.LongAdder 进行的,是个比较高效的线程安全计数实现,大多数情况下,建议使用 ActomicLong。相对于 JDK 7 中的实现,没有重试机制, JDK 8 中 put 或者 clear 方法,remove 中有 addCount() 方法 + CounterCell 能得精确的size。


相关文章
|
3月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
1月前
|
传感器 数据处理 定位技术
多线程;顺序容器;智能指针
【10月更文挑战第14天】多线程的创建创建线程比较简单,C++提供头文件thread,使用std的thread实例化一个线程对象创建。 std::thread 在 #include 头文件中声明,因此使用 std::thread 时需要包含 #include 头文件。
|
1月前
|
传感器 数据处理 定位技术
多线程;顺序容器;智能指针
多线程的创建创建线程比较简单,C++提供头文件thread,使用std的thread实例化一个线程对象创建。 std::thread 在 #include 头文件中声明,因此使用 std::thread 时需要包含 #include 头文件。 #include &lt;iostream&gt; #include &lt;thread&gt; #include &lt;stdlib.h&gt; //sleep using namespace std; void t1() //普通的函数,用来执行线程 { for (int i = 0; i &lt; 10; ++i)
多线程;顺序容器;智能指针
|
2月前
|
传感器 数据处理 定位技术
多线程;顺序容器;智能指针
多线程的创建创建线程比较简单,C++提供头文件thread,使用std的thread实例化一个线程对象创建。 std::thread 在 #include 头文件中声明,因此使用 std::thread 时需要包含 #include 头文件。 #include &lt;iostream&gt; #include &lt;thread&gt; #include &lt;stdlib.h&gt; //sleep using namespace std; void t1() //普通的函数,用来执行线程 { for (int i = 0; i &lt; 10; ++i)
|
3月前
|
监控 安全 Java
Java多线程调试技巧:如何定位和解决线程安全问题
Java多线程调试技巧:如何定位和解决线程安全问题
140 2
|
3月前
|
存储 Java 开发者
HashMap线程安全问题大揭秘:ConcurrentHashMap、自定义同步,一文让你彻底解锁!
【8月更文挑战第24天】HashMap是Java集合框架中不可或缺的一部分,以其高效的键值对存储和快速访问能力广受开发者欢迎。本文深入探讨了HashMap在JDK 1.8后的底层结构——数组+链表+红黑树混合模式,这种设计既利用了数组的快速定位优势,又通过链表和红黑树有效解决了哈希冲突问题。数组作为基石,每个元素包含一个Node节点,通过next指针形成链表;当链表长度过长时,采用红黑树进行优化,显著提升性能。此外,还介绍了HashMap的扩容机制,确保即使在数据量增大时也能保持高效运作。通过示例代码展示如何使用HashMap进行基本操作,帮助理解其实现原理及应用场景。
57 1
|
3月前
|
安全 算法 Java
【Java集合类面试二】、 Java中的容器,线程安全和线程不安全的分别有哪些?
这篇文章讨论了Java集合类的线程安全性,列举了线程不安全的集合类(如HashSet、ArrayList、HashMap)和线程安全的集合类(如Vector、Hashtable),同时介绍了Java 5之后提供的java.util.concurrent包中的高效并发集合类,如ConcurrentHashMap和CopyOnWriteArrayList。
【Java集合类面试二】、 Java中的容器,线程安全和线程不安全的分别有哪些?
|
2月前
|
安全 Java
LinkedBlockingQueue 是线程安全的,为什么会有两个线程都take()到同一个对象了?
LinkedBlockingQueue 是线程安全的,为什么会有两个线程都take()到同一个对象了?
47 0
|
3月前
|
存储 安全 Java
【多线程面试题十七】、如果不使用synchronized和Lock,如何保证线程安全?
这篇文章探讨了在不使用`synchronized`和`Lock`的情况下保证线程安全的方法,包括使用`volatile`关键字、原子变量、线程本地存储(`ThreadLocal`)以及设计不可变对象。
|
13天前
|
Kubernetes Cloud Native Docker
云原生时代的容器化实践:Docker和Kubernetes入门
【10月更文挑战第37天】在数字化转型的浪潮中,云原生技术成为企业提升敏捷性和效率的关键。本篇文章将引导读者了解如何利用Docker进行容器化打包及部署,以及Kubernetes集群管理的基础操作,帮助初学者快速入门云原生的世界。通过实际案例分析,我们将深入探讨这些技术在现代IT架构中的应用与影响。
54 2
下一篇
无影云桌面