并发编程之ConcurrentHashMap jdk1.7和1.8源码剖析(二)

简介: 并发编程之ConcurrentHashMap jdk1.7和1.8源码剖析

3.1.8 size()操作

  如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。

  因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2(RETRIES_BEFORE_LOCK)次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。

 那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
              //尝试2 次通过不锁住Segment的方式来统计各个Segment大小
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                //如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment 
                 //的大小。
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

详细源码注释可以看

lzh-technology/src/main/java/com/lzhsite/technology/collections/hashMap/myConcurrentHashMap/ConcurrentHashMap7.java · lzhcode/maven-parent - Gitee.com 

3.2 jdk1.8的源码

3.2.1主要数据结构

    1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构如下:

ConcurrentHashMap在put操作时,如果发现链表结构中的元素超过了TREEIFY_THRESHOLD(默认为8),则会把链表转换为红黑树,已便于提高查询效率。代码如下:

if (binCount >= TREEIFY_THRESHOLD)
    treeifyBin(tab, i);

ConcurrentHashMap,提供了许多内部类来进行辅助实现,如Node,TreeNode,TreeBin等等。下面我们就一起来看看ConcurrentHashMap几个重要的内部类。

ConcurrentHashMap的成员变量

  // 最大容量:2^30=1073741824
  private static final int MAXIMUM_CAPACITY = 1 << 30;
  // 默认初始值,必须是2的幕数
  private static final int DEFAULT_CAPACITY = 16;
  //
  static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  //
  private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  //
  private static final float LOAD_FACTOR = 0.75f;
  // 链表转红黑树阀值,> 8 链表转换为红黑树
  static final int TREEIFY_THRESHOLD = 8;
  // 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数
        //量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
  static final int UNTREEIFY_THRESHOLD = 6;
  //
  static final int MIN_TREEIFY_CAPACITY = 64;
  //
  private static final int MIN_TRANSFER_STRIDE = 16;
  //
  private static int RESIZE_STAMP_BITS = 16;
  // 2^15-1,help resize的最大线程数
  private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
  // 32-16=16,sizeCtl中记录size大小的偏移量
  private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
  // forwarding nodes的hash值
  static final int MOVED = -1;
  // 树根节点的hash值
  static final int TREEBIN = -2;
  // ReservationNode的hash值
  static final int RESERVED = -3;
  // 可用处理器数量
  static final int NCPU = Runtime.getRuntime().availableProcessors();
  // 用来存放Node节点数据的,默认为null,默认大小为16的数组,每次扩容时大小总是2的幂次方;
  transient volatile Node<K, V>[] table;
  // 扩容时新生成的数据,数组为table的两倍;
  private transient volatile Node<K, V>[] nextTable;

Node

在Node内部类中,其属性value、next都是带有volatile的。同时其对value的setter方法进行了特殊处理,不允许直接调用其setter方法来修改value的值。最后Node还提供了find方法来赋值map.get()。

static class Node<K, V> implements Map.Entry<K, V> {
    final int hash;
    final K key;
                //get操作全程不需要加锁是因为Node的成员val是用volatile修饰的和数组用volatile修饰没有关系。
    volatile V val;
    volatile Node<K, V> next;
    Node(int hash, K key, V val, Node<K, V> next) {
      this.hash = hash;
      this.key = key;
      this.val = val;
      this.next = next;
    }
    public final K getKey() {
      return key;
    }
    public final V getValue() {
      return val;
    }
    public final int hashCode() {
      return key.hashCode() ^ val.hashCode();
    }
    public final String toString() {
      return key + "=" + val;
    }
    public final V setValue(V value) {
      throw new UnsupportedOperationException();
    }
    public final boolean equals(Object o) {
      Object k, v, u;
      Map.Entry<?, ?> e;
      return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?, ?>) o).getKey()) != null
          && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u)));
    }
    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */
    Node<K, V> find(int h, Object k) {
      Node<K, V> e = this;
      if (k != null) {
        do {
          K ek;
          if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
            return e;
        } while ((e = e.next) != null);
      }
      return null;
    }
  }

TreeNode

我们在学习HashMap的时候就知道,HashMap的核心数据结构就是链表。在ConcurrentHashMap中就不一样了,如果链表的数据过长是会转换为红黑树来处理。当它并不是直接转换,而是将这些链表的节点包装成TreeNode放在TreeBin对象中,然后由TreeBin完成红黑树的转换。所以TreeNode也必须是ConcurrentHashMap的一个核心类,其为树节点类,定义如下:

static final class TreeNode<K, V> extends Node<K, V> {
    TreeNode<K, V> parent; // red-black tree links
    TreeNode<K, V> left;
    TreeNode<K, V> right;
    TreeNode<K, V> prev; // needed to unlink next upon deletion
    boolean red;
    TreeNode(int hash, K key, V val, Node<K, V> next, TreeNode<K, V> parent) {
      super(hash, key, val, next);
      this.parent = parent;
    }
    Node<K, V> find(int h, Object k) {
      return findTreeNode(h, k, null);
    }
    /**
     *  查找hash为h,key为k的节点
     */
    final TreeNode<K, V> findTreeNode(int h, Object k, Class<?> kc) {
      if (k != null) {
        TreeNode<K, V> p = this;
        do {
          int ph, dir;
          K pk;
          TreeNode<K, V> q;
          TreeNode<K, V> pl = p.left, pr = p.right;
          if ((ph = p.hash) > h)
            p = pl;
          else if (ph < h)
            p = pr;
          else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
            return p;
          else if (pl == null)
            p = pr;
          else if (pr == null)
            p = pl;
          else if ((kc != null || (kc = comparableClassFor(k)) != null)
              && (dir = compareComparables(kc, k, pk)) != 0)
            p = (dir < 0) ? pl : pr;
          else if ((q = pr.findTreeNode(h, k, kc)) != null)
            return q;
          else
            p = pl;
        } while (p != null);
      }
      return null;
    }
  }

源码展示TreeNode继承Node,且提供了findTreeNode用来查找查找hash为h,key为k的节点。

TreeBin

该类并不负责key-value的键值对包装,它用于在链表转换为红黑树时包装TreeNode节点,也就是说ConcurrentHashMap红黑树存放是TreeBin,不是TreeNode。该类封装了一系列的方法,包括putTreeVal、lookRoot、UNlookRoot、remove、balanceInsetion、balanceDeletion。由于TreeBin的代码太长我们这里只展示构造方法(构造方法就是构造红黑树的过程):

  static final class TreeBin<K, V> extends Node<K, V> {
    TreeNode<K, V> root;
    volatile TreeNode<K, V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock
    static int tieBreakOrder(Object a, Object b) {
      int d;
      if (a == null || b == null || (d = a.getClass().getName().compareTo(b.getClass().getName())) == 0)
        d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1);
      return d;
    }
               //构造方法就是在构造一个红黑树的过程。
    TreeBin(TreeNode<K, V> b) {
      super(TREEBIN, null, null, null);
      this.first = b;
      TreeNode<K, V> r = null;
      for (TreeNode<K, V> x = b, next; x != null; x = next) {
        next = (TreeNode<K, V>) x.next;
        x.left = x.right = null;
        if (r == null) {
          x.parent = null;
          x.red = false;
          r = x;
        } else {
          K k = x.key;
          int h = x.hash;
          Class<?> kc = null;
          for (TreeNode<K, V> p = r;;) {
            int dir, ph;
            K pk = p.key;
            if ((ph = p.hash) > h)
              dir = -1;
            else if (ph < h)
              dir = 1;
            else if ((kc == null && (kc = comparableClassFor(k)) == null)
                || (dir = compareComparables(kc, k, pk)) == 0)
              dir = tieBreakOrder(k, pk);
            TreeNode<K, V> xp = p;
            if ((p = (dir <= 0) ? p.left : p.right) == null) {
              x.parent = xp;
              if (dir <= 0)
                xp.left = x;
              else
                xp.right = x;
              r = balanceInsertion(r, x);
              break;
            }
          }
        }
      }
      this.root = r;
      assert checkInvariants(root);
    }
    private final void lockRoot() {
       //略
    }
    private final void unlockRoot() {
       //略
    }
    private final void contendedLock() {
                       //略
    }
    final Node<K, V> find(int h, Object k) {
                        //略
    }
    final TreeNode<K, V> putTreeVal(int h, K k, V v) {
                       //略
    }
    final boolean removeTreeNode(TreeNode<K, V> p) {
                      //略
    }
    static <K, V> TreeNode<K, V> rotateLeft(TreeNode<K, V> root, TreeNode<K, V> p) {
                     //略
    }      
    static <K, V> TreeNode<K, V> rotateRight(TreeNode<K, V> root, TreeNode<K, V> p) {
                     //略
    }
    static <K, V> TreeNode<K, V> balanceInsertion(TreeNode<K, V> root, TreeNode<K, V> x) {
                      //略
    }
    static <K, V> TreeNode<K, V> balanceDeletion(TreeNode<K, V> root, TreeNode<K, V> x) {
       //略
    }
    static <K, V> boolean checkInvariants(TreeNode<K, V> t) {
        //略
    }
  }
相关文章
|
6月前
|
安全 前端开发 Java
JDK源码级别彻底剖析JVM类加载机制
JDK源码级别彻底剖析JVM类加载机制
|
6月前
|
缓存 Dubbo Java
趁同事上厕所的时间,看完了 Dubbo SPI 的源码,瞬间觉得 JDK SPI 不香了
趁同事上厕所的时间,看完了 Dubbo SPI 的源码,瞬间觉得 JDK SPI 不香了
|
2月前
|
监控 Java 开发者
【并发编程的终极简化】JDK 22结构化并发:让并发编程变得像写代码一样简单!
【9月更文挑战第8天】随着JDK 22的发布,结构化并发为Java编程带来了全新的并发编程体验。它不仅简化了并发编程的复杂性,提高了程序的可靠性和可观察性,还为开发者们提供了更加高效、简单的并发编程方式。我们相信,在未来的发展中,结构化并发将成为Java并发编程的主流方式之一,推动Java编程语言的进一步发展。让我们共同期待Java在并发编程领域的更多创新和突破!
|
3月前
|
算法 安全 Java
深入JDK源码:揭开ConcurrentHashMap底层结构的神秘面纱
【8月更文挑战第24天】`ConcurrentHashMap`是Java并发编程中不可或缺的线程安全哈希表实现。它通过精巧的锁机制和无锁算法显著提升了并发性能。本文首先介绍了早期版本中使用的“段”结构,每个段是一个带有独立锁的小型哈希表,能够减少线程间竞争并支持动态扩容以应对高并发场景。随后探讨了JDK 8的重大改进:取消段的概念,采用更细粒度的锁控制,并引入`Node`等内部类以及CAS操作,有效解决了哈希冲突并实现了高性能的并发访问。这些设计使得`ConcurrentHashMap`成为构建高效多线程应用的强大工具。
53 2
|
5月前
|
Java Spring
深入解析Spring源码,揭示JDK动态代理的工作原理。
深入解析Spring源码,揭示JDK动态代理的工作原理。
59 0
|
6月前
|
设计模式 Java
根据JDK源码Calendar来看工厂模式和建造者模式
根据JDK源码Calendar来看工厂模式和建造者模式
|
6月前
|
算法 Java 索引
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
67 0
|
6月前
|
Java Linux iOS开发
Spring5源码(27)-静态代理模式和JDK、CGLIB动态代理
Spring5源码(27)-静态代理模式和JDK、CGLIB动态代理
48 0
|
6月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
127 1
|
6月前
|
算法 安全 Java
ConcurrentLinkedQueue的源码解析(基于JDK1.8)
ConcurrentLinkedQueue的源码解析(基于JDK1.8) ConcurrentLinkedQueue是Java集合框架中的一种线程安全的队列,它是通过CAS(Compare and Swap)算法实现的并发队列。在并发场景下,ConcurrentLinkedQueue能够保证队列的线程安全性,同时性能也很不错。