如何保证容器是线程安全的? ConcurrentHashMap 如何高效的线程安全?

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 如何保证容器是线程安全的? ConcurrentHashMap 如何高效的线程安全?

如何保证容器是线程安全的?ConcurrentHashMap 如何高效的线程安全?



Java提供了不同层面的线程安全支持。在传统集合框架内部,除了 Hashtable等同步容器,还提供了所谓的同步包装器(Synchronized Wrapper),我们可以调用Collections工具类提供的包装方法,来获取一个同步的包装容器(如 Collections.synchronizedMap),但是它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。其实可以利用并发包提供的线程安全容器。


  • 各种并发容器,比如 ConcurrentHashMap、CopyOnWriteArrayList
  • 各种线程安全队列(Queue/Deque),比如 ArrayBlockingQueue,SynchronousQueue
  • 各种有序容器的线程安全版本。


如何保证线程安全


首先要保障线程安全的几个基本特性, 原子性,可见性,有序性。其次可以通过封装的方式将内部对象保护起来,保证变量对象的不可变性,一般就线程安全了。

  • 理解基本的线程安全工具
  • 理解传统集合矿建并发变成中 Map 存在的问题,清楚简单同步方式的不足
  • 梳理并发包内,尤其是 ConcurrentHashMap 采取了哪些方法来提高并发表现。
  • 最好能够掌握 ConcurrentHashMap 自身的演进,目前很多分析资料还是基于早期版本。


为什么需要 ConcurrentHashMap


Hashtable 是怎样实现线程安全的。


Hashtable 能够保证线程安全,但是它的基本就是将 put ,get ,size 等各种操作加上 synchronized, 这样就导致了所有并发操作都要竞争一把锁,一个线程在进行同步操作时,其他线程只能等待,大大减低了并发效率。


@SuppressWarnings("unchecked")
public synchronized V get(Object key) {
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
        if ((e.hash == hash) && e.key.equals(key)) {
            return (V)e.value;
        }
    }
    return null;
}
public synchronized V put(K key, V value) {
    // Make sure the value is not null
    if (value == null) {
        throw new NullPointerException();
    }
    // Makes sure the key is not already in the hashtable.
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    @SuppressWarnings("unchecked")
    Entry<K,V> entry = (Entry<K,V>)tab[index];
    for(; entry != null ; entry = entry.next) {
        if ((entry.hash == hash) && entry.key.equals(key)) {
            V old = entry.value;
            entry.value = value;
            return old;
        }
    }
    addEntry(hash, key, value, index);
    return null;
}

SynchronizedMap 是如何实现线程安全的?


SynchronizedMap 并没有声明 synchronized 方法,但是还是利用了互斥的 mutex ,相对于 hashtable 没有真正意义上的改进。


private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
        private static final long serialVersionUID = 1978198479659022715L;
        private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize
        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }
        SynchronizedMap(Map<K,V> m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }
        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }
        public boolean containsKey(Object key) {
            synchronized (mutex) {return m.containsKey(key);}
        }
        public boolean containsValue(Object value) {
            synchronized (mutex) {return m.containsValue(value);}
        }
        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }
        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }
        public void putAll(Map<? extends K, ? extends V> map) {
            synchronized (mutex) {m.putAll(map);}
        }
        public void clear() {
            synchronized (mutex) {m.clear();}
        }
        private transient Set<K> keySet;
        private transient Set<Map.Entry<K,V>> entrySet;
        private transient Collection<V> values;
        public Set<K> keySet() {
            synchronized (mutex) {
                if (keySet==null)
                    keySet = new SynchronizedSet<>(m.keySet(), mutex);
                return keySet;
            }
        }
        public Set<Map.Entry<K,V>> entrySet() {
            synchronized (mutex) {
                if (entrySet==null)
                    entrySet = new SynchronizedSet<>(m.entrySet(), mutex);
                return entrySet;
            }
        }
}

hashtable 或者同步包装版本,都只适合在非高度并发的场景下。


ConcurrentHashMap 是如何设计实现的?


ConcurrentHashMap 为什么能够大大提高并发效率?ConcurrentHashMap 的设计一直在演化,比如在 Java 8 中就发生发生了很大变化。

  • 分离锁,也就是将内部进行分段( Segment),里面则是 HashEntry的数组,和 HashMap类似,哈希相同的条目也是以链表形式存放。
  • HashEntry 内部使用 volatile的 value 字段来保证可见性,也利用了不可变对象的机制以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe中的很多操作都是 VM intrinsic优化过的。

早期的 ConcurrentHashMap 实现


640.png

JDK 7 ConcurrentHashMap 是如何 get 的?


public v get(object key){
  Segment<k, V> s ;// manually integrate access methods to reduce overhead
  HashEntry<k, v>[] tab:
  int h= hash(key.hashcode());
  //利用操作替换普迅数字运算
  long u = (((h >> segmentshift)& segmentMask)<< SSHIFT)+ SBASE
  //以 segment为单位,进行定位
  //利用 insafe直接 Volatile access
  if((s =(Segment<k, V>)UNSAFE.getobjectvolatile(segments, u)) != null &&(tab = s.table) != null){
  }
  // 略
  return null;
}

JDK 7 ConcurrentHashMap 是如何 put 的?


public v put(K key, v value){
    segment<K, v> 5;
    if(value =s null)
      throw new NullPointerException()
    //二次哈希,以保证数分散性,避免哈希突
    int hash = hash(key.hashcode());
    int j =(hash >>>segmentshift)& segnentMask ;
    if((s =(Segment<k, V>) UNSAFE.getobject
    // nonvolatile:recheck
    (segments, (j < SSHIFT)+ SBASE))= null)// in ensuresegm     return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // 无论如何,确保获取锁
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            // 更新已有的value
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                    // 放着hashEntry 到特定位置,如果超过阀值,进行 rehash
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

JDK 7 实现的ConcurrentHashMap 是居于分段锁技术实现的


  • ConcurrentHashMap会获取再入锁,以保证数据一致性, Segment本身就是基于 ReentrantLock的扩展实现,所以,在并发修改期间,相应 Segment是被锁定的
  • 在最初阶段,进行重复性的扫描,以确定相应key值是否已经在数组里面,进而决定是更新还是放置操作,你可以在代码里看到相应的注释。重复扫描、检测冲突是 ConcurrentHash Map的常见技巧我在专栏上一讲介绍 HashMap时,提到了可能发生的扩容问题,在 ConcurrentHashMap 中同样存在。不过有一个明显区别,就是它进行的不是整体的扩容,而是单独对 Segmen进行扩容,细节就不介绍了。
  • 另外一个Map的size方法同样需要关注,它的实现涉及分离锁的一个副作用。试想,如果不进行同步,简单的计算所有 Segment的总值,可能会因为并发put,导致结果不准确,但是直接锁定所有 Segment进行计算,就会变得非常昂贵。其实,分离锁也限制了Map的初始化等操作。


JDK 7  ConcurrentHashMap 是如何获得 size 的


public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                // 实现重试机制 ,试图获得可靠值,如果监控到发生变化,就直接返回,否则就获取锁进行操作,
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                // 监控是否有变化
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

ConcurrentHashMap的实现是通过重试机制( RETRIES_ BEFORE_LOCK,指定重试次数2),来试图获得可靠值。如果没有监控到发生变化(通过对比 Segment.modCount),就直接返回,否则获取锁进行操作。


Java 8 之后的版本 ConcurrentHash 发生了哪些变化?


  • 总体结构上,它的内部存储变得和我在专栏上一讲介绍的 HashMap结构非常相似,同样是大的桶( bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要细致一些,还是数组+链表结构。
  • 其内部仍然有 Segment定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。
  • 因为不再使用 Segment,初始化操作大大简化,修改为lazy-oad形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点
  • 数据存储利用 volatile来保证可见性。
  • 使用 CAS 等操作,在特定场景进行无锁并发操作
  • 使用 Unsafe、 LongAdder之类底层手段,进行极端情况的优化。


JDK 8 数据存储内部实现


我们可以发现Key是final的,因为在生命周期中,一个条目的Key发生变化是不可能的;与此同时value,则声明为 volatile,以保证可见性。

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

JDK 8 ConcurrentHashMap 是怎么 get 的?


实现相对简单,先找到哪个节点,然后,在链表中遍历查找。


public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

JDK 8 ConcurrentHashMap 是怎么 put 的 ?


public V put(K key, V value) {
        return putVal(key, value, false);
    }
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
               // 利用CAS 去进行无锁线程安全操作 如果 bin 是空的
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    // 细粒度的同步修改操作
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //  Bin 链表超过阀值,进行树化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

可以看到在同步逻辑上,它使用的是 synchronized ,不是 JDK 7 的 ReentrantLock 之类,为什么?JDK 对 synchronized 进行了不断优化,不在需要过分担心性能差异,相对于 ReentrantLock,可以减少内存消耗,是个非常大的优势。


JDK 8 ConcurentHashMap 是怎么进行 inittable 的?


private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

这里用的是一个典型的 CAS 使用场景,利用 volatile 的 sizectl 作为互斥手段,如果发现竞争性的初始化,那么spin, 等待条件恢复,否则利用 CAS 设置排他标准时,如果成功那么初始化,否则重试。while循环就是这个功效,


JDK 8 ConcurrentHashMap size() 方法


public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
  final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
 static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
  }


计数思路还是一样的,都是分而治之进行计数,然后求和处理。可以看到 CounterCell 的操作 ,是基于 java.util.concurrent.atomic.LongAdder 进行的,是个比较高效的线程安全计数实现,大多数情况下,建议使用 ActomicLong。相对于 JDK 7 中的实现,没有重试机制, JDK 8 中 put 或者 clear 方法,remove 中有 addCount() 方法 + CounterCell 能得精确的size。


相关文章
|
16天前
|
缓存 监控 安全
Java的线程池和线程安全
Java的线程池和线程安全
|
10天前
|
安全 Java 大数据
Java性能优化(七)-多线程调优-并发容器的使用
Java性能优化(七)-多线程调优-并发容器的使用
17 0
|
11天前
|
安全 Java 容器
多线程(进阶四:线程安全的集合类)
多线程(进阶四:线程安全的集合类)
15 0
|
17天前
|
安全 Linux 编译器
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)(下)
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)
22 0
|
17天前
|
安全 C语言 C++
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)(中)
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)
20 0
|
17天前
|
Linux 调度 C语言
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)(上)
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)
23 0
|
18天前
|
安全 Java 编译器
Java多线程基础-6:线程安全问题及解决措施,synchronized关键字与volatile关键字(一)
线程安全问题是多线程编程中最典型的一类问题之一。如果多线程环境下代码运行的结果是符合我们预期的,即该结果正是在单线程环境中应该出现的结果,则说这个程序是线程安全的。 通俗来说,线程不安全指的就是某一代码在多线程环境下执行会出现bug,而在单线程环境下执行就不会。线程安全问题本质上是由于线程之间的调度顺序的不确定性,正是这样的不确定性,给我们的代码带来了很多“变数”。 本文将对Java多线程编程中,线程安全问题展开详细的讲解。
36 0
|
18天前
|
编解码 安全 算法
Java多线程基础-18:线程安全的集合类与ConcurrentHashMap
如果这些单线程中的集合类确实需要在多线程中使用,该怎么办呢?思路有两个: 最直接的方式:使用锁,手动保证。如多个线程修改ArrayList对象,此时就可能有问题,就可以给修改操作进行加锁。但手动加锁的方式并不是很方便,因此标准库还提供了一些线程安全的集合类。
31 4
|
3天前
|
运维 Ubuntu Docker
深入理解容器化技术:Docker的应用与实践
在这个数字化转型迅速推进的时代,容器化技术为软件开发和部署提供了新的路径。本文将深入探讨Docker技术的基本原理、应用场景以及实际操作,旨在帮助读者全面理解并掌握这一关键技术。
24 2
|
3天前
|
Docker 容器
蓝易云 - Docker修改容器ulimit的全部方案及各方案的详细步骤
以上就是修改Docker容器ulimit的全部方案及其详细步骤。
9 2