认真学习jdk1.8下ConcurrentHashMap的扩容机制

简介: 认真学习jdk1.8下ConcurrentHashMap的扩容机制

jdk1.7下的rehash会对某段的某个数组进行二倍扩容,然后把链表拆分放到数组的不同位置。jdk1.8下ConcurrentHashMap的扩容就要麻烦了。


首先在链表转化为树的时候,会判断tab.length<64,如果tab.length<64,那么不会转化为树而是会触发tryPresize(n << 1),n=tab.length,这个方法可能会初始化tab(比如使用给定的map进行初始化ConcurrentHashMap,此时tab还没有进行实例化),也可能会触发扩容。


这里再回顾一下sizeCtl,是控制标识符,不同的值表示不同的意义:


负数代表正在进行初始化或者扩容操作,其中-1表示正在初始化,-N表示有N-1个线程正在进行扩容操作。

整数或者0表示初始化或者下一次扩容的大小,类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的,实际容量>=sizeCtl,则扩容。

【1】tryPresize

在putAll或者treeifyBin方法中会触发tryPresize。尝试预先调整表的大小,以容纳给定数量的元素。其核心功能就是:


  • 确定扩容后数组大小
  • 如果数组未初始化那么进行初始化
  • 如果tab已经初始化那么可能会帮着扩容(如果正在扩容中)
//size = n << 1,n = tab.length
private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
      // 1.5*size + 1 =》tableSizeFor 是为了2的N次幂 a power of two table
      // 也就是1.5*size+1  最近的一个2的N次幂
      //也就是3n+1 最近的一个2的N次幂,当n=16,那么c=64
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    // 这里是while !整数或者0表示tab还没有被初始化,这个数值表示初始化或者下一次扩容的大小
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        //如果tab还没有初始化,那么先初始化tab,长度要么为SC,要么为C
        if (tab == null || (n = tab.length) == 0) {
          //更新N:如果记录的阈值大于C,那么取阈值,否则取C
            n = (sc > c) ? sc : c;
            // 更新SIZECTL的值,如果为SC,就从SC 变为 -1,表示正在扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                // 也就是table没有被改变,初始化数组赋予table
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        // sc= 0.75n
                        sc = n - (n >>> 2);
                    }
                } finally {
                // 更新sizeCtl为0.75n ,其实也就是默认负载因子下的阈值
                    sizeCtl = sc;
                }
            }
        }
// 如果c<=SC说明无需扩容,如果n >= MAXIMUM_CAPACITY,说明没办法扩容,已经上限了
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
       // 如果C > SC 且tab已经初始化,尝试进行扩容    
        else if (tab == table) {
          //获取当前 N 下的扩容戳,假设N为16 ,那么RS=32795
            int rs = resizeStamp(n);
            //说明目前有其他线程在扩容--其实这个几率可以忽略的
//(sc = sizeCtl) >= 0说明要么还没有初始化,要么已经初始化过并且不在扩容ing
//代码执行到这里sc变为了0说明有其他线程在扩容,这个几率太低太低了
            if (sc < 0) {
                Node<K,V>[] nt;
                // 如果属于下面几种情况,直接break-这里也是有bug的
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                 // 否则就更新   SIZECTL的值从sc--sc+1,表示多了一个线程进行扩容
                 // 也就是helpTransfer
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                  //扩容的实际方法,nt=nextTable
                    transfer(tab, nt);
            }
            //否则说明当前是第一个扩容的线程,
            //更新SIZECTL的值从SC -- rs << RESIZE_STAMP_SHIFT) + 2
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
             // rs=32795 , 32795<< 16 等于 -2145714176       
             // 也就是说 (rs << RESIZE_STAMP_SHIFT) + 2 = -2145714174                
                transfer(tab, null);
        }
    }
}



if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)用来判断五种情况:


如果(sc >>> RESIZE_STAMP_SHIFT) != rs ,说明当前线程获取到的扩容唯一标识戳 非 本批次扩容(个人理解jdk想表达的意思);


或者 sc == rs + 1 (扩容结束了,不再有线程进行扩容。默认第一个线程设置 sc ==(rs <<16) + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 (rs<<16) + 1所以个人认为这里表达式应该是sc == ( rs << RESIZE_STAMP_SHIFT) + 1(个人理解jdk想表达的意思)。


或者 sc == rs + 65535 如果达到最大帮助线程的数量就结束,应该是(rs << 16 )+65535)


(nt = nextTable) == null,直接放弃扩容,nextTable表示扩容后的数组,只有在扩容进行时其才不为null。(其实这个判断无所谓的,因为nextTable在transfer中会再次判断如果为null就进行初始化。而且在并发环境下tryPresize方法中的nextTable确实可能为null)。


或者transferIndex <= 0,transferIndex 在transfer方法中会被赋值,记录nextIndex,小于0说明map对象全局范围内的任务已经分配完了,当前线程进去也没活干

RESIZE_STAMP_BITS=16
RESIZE_STAMP_SHIFT=32-RESIZE_STAMP_BITS
MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1 =2^16-1 ;

这里确实是有问题的,在认真学习jdk1.8下ConcurrentHashMap的实现原理博文中我们分析helpTransfer方法时分析过条件(sc >>> RESIZE_STAMP_SHIFT) != rs 、 sc == rs + 1 以及 sc == rs + MAX_RESIZERS,这是一个bug,在jdk12已经修复。其实在jdk12中,tryPresize方法也发生了改变,如下所示:

private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
        // 可以看到在jdk12中,这里精简了很多,直接作为第一个扩容线程触发调整
            int rs = resizeStamp(n);
            if (U.compareAndSetInt(this, SIZECTL, sc,
                                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}


我们接下来看看transfer方法。

【2】transfer

与HashMap不同的时,ConcurrentHashMap并没有一个resize方法。其扩容后的数组也就是newTab(nextTable)是在调整(transfer)过程中实例化的(如果nextTable为null的话)。


① 触发扩容的时机

有以下几个地方触发扩容:


addCount(long x, int check)

helpTransfer(Node<K,V>[] tab, Node<K,V> f)

tryPresize(int size),其中tryPresize方法在treeifyBin或者putAll(Map<? extends K, ? extends V> m)会被触发

② 扩容方法

//NCPU 就是可用处理器个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
//最小调整步幅,>=DEFAULT_CAPACITY=16
private static final int MIN_TRANSFER_STRIDE = 16;
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  //记录调整前tab.length
    int n = tab.length, stride;
    // MIN_TRANSFER_STRIDE=16,确定调整步幅,小于16则重置为16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    //如果 nextTab 未初始化,则初始化为2N大小数组   
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        //需要调整的槽位个数,n 是调整前tab.length
        transferIndex = n;
    }
    // 记录扩容后数组大小
    int nextn = nextTab.length;
    // 包装一个ForwardingNode(-1,null,null,null),
    // ForwardingNode.nextTable=nextTab
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    //表示是否可以进行调整
    boolean advance = true;
  //记录是否完成扩容/调整
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // 也就是一个线程调整,只能调整 [bound,nextIndex)范围的索引
            //最多允许nextIndex/stride 个线程并发调整
            if (--i >= bound || finishing)
                advance = false;
            // 没有可以调整的“槽位”了,这里给nextIndex赋了初值 = transferIndex  
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 修改TRANSFERINDEX,从 nextIndex --> nextIndex - stride or 0
            // nextIndex = transferIndex=oldTab.length
            // 如果有新的线程进来,可能会获取新的段 nextIndex - stride - stride 
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 如果扩容结束 这里是唯一结束循环的地方!!!
            if (finishing) {
                nextTable = null; //table指向新的nextTab , nextTable释放
                table = nextTab;
                // 1.5n,扩容后的tab.length=2n  ,仍旧保持0.75比例
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 修改SIZECTL = SIZECTL -1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
              // SIZECTL=(rs<<16)+2,进入方法前调整了SIZECTL为(rs<<16)+2
              // rs=resizeStamp(n),这里进行校验是否扩容结束跳出循环
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
---上面过程除了得到finishing 、advance 标志外,
---最重要的就是获取槽位索引 i 的值,下面就是对槽位进行调整
        //如果 i 这个槽位为null,那么直接把fwd放进去
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 如果 i 这个槽位不为null,且hash=MOVED表示正在扩容/调整    
        //说明 f 就是一个ForwardingNode
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
          //对tab[i]位置进行加锁  ,也就是对数组的一个槽位(索引位置)加锁
            synchronized (f) {
              //再次判断是否相等
                if (tabAt(tab, i) == f) {
// 这下面对链表、树进行处理的过程和HashMap十分类似 !!
                    Node<K,V> ln, hn;
                    //说明是链表
                    if (fh >= 0) {
                      //和HashMap一样,用来判断位置是否发生改变
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
            //遍历,确定第一个(hash & n) != (fh & n) 的元素  
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        //位置未发生改变的
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        //位置发生改变的
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        //遍历直到lastRun
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                            //(hash,key,value,next)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                            //头插法形成链
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //分别把两个链表放到新数组的两个位置
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组的桶设置成 forwardingNode, 使得迁移过程中的
                        //get线程可以通过 forwardingNode 访问到已经迁移到新链表的元素
                        setTabAt(tab, i, fwd);
            //调整其为true,使当前线程可以再次进入while
                        advance = true;
                    }
                    // 如果是树
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        // 位置不发生变化的头和尾
                        TreeNode<K,V> lo = null, loTail = null;
                        //位置发生变化的头和尾
                        TreeNode<K,V> hi = null, hiTail = null;
                        //统计个数,用来判断是否进行树化和去树化(链表化)
                        int lc = 0, hc = 0;
                        //从first开始,一路next往下
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                             //位置没有发生变化的-位置指的是槽位   
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                 //一路串到后面   
                                loTail = p;
                                //个数++
                                ++lc;
                            }
                            //槽位(索引位置发生改变的)
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //根据lc hc判断是否进行树化 、链表化
                        // ln hn 这里记录的是槽位结点
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        //分别放到新数组的两个不同槽位上    
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 旧数组的 i 槽位放上fwd
                        setTabAt(tab, i, fwd);
                        // 修改为true,表示本次结束
                        advance = true;
                    }
                }
            }
        }
    }
}


关于链表和树的分割以及为什么使用 hash & n==0进行判断位置是否变化,下面我们总结分析一些问题。

① 最大允许多少个并发?

如下代码所示,那么最多允许oldTab.length/stride 个线程并发调整,每个线程处理的索引范围是[bound , nextIndex-1]

// nextIndex transferIndex 的初始值
nextIndex = transferIndex = oldTab.length
// 每个线程更新范围
U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, 
      nextBound = (nextIndex > stride ?nextIndex - stride : 0))
// nextIndex - stride 或者0  ,也就是下限
bound = nextBound;
// 其实就是上限
i = nextIndex - 1;

假设当前tab.length=n=32,那么下一个调整的nextTab.length=64。假设stride=16,此时nextIndex=transferIndex=32。允许两个并发线程,调整的槽位的范围为[0,15],[16,31],左右都是闭区间。


② 锁机制

在transfer方法中我们可以到处看到CAS思想,比如advance = casTabAt(tab, i, null, fwd);。此外,在对槽位进行调整的时候对槽位结点进行了重锁,如下所示:



③ 如何结束循环?


在进入transfer前,如果ConcurrentHashMap已经正在扩容,那么更新SIZECTL=sc+1,否则就作为第一个扩容的检测,更新SIZECTL=(rs << RESIZE_STAMP_SHIFT) + 2),其中rs= resizeStamp(n),n 是调整前tab.length。


384b8a929c904b338cfa04eee067d2aa.png


如下所示,假设当前并发15个线程进行扩容(假设每个线程的执行时间严格一致)。


① 如果不是最后一个扩容线程执行该方法,那么这里首先记录sc = sizeCtl然后会更新SIZECTL=sc-1, return结束当前循环,但是不会终止并发扩容。

② 如果是最后一个扩容线程进入该方法,除了同样进行 sc - 1 外,此时判断(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT为真,那么会更新finishing = advance = true; i =n,说明并发扩容结束。

也就是理想情况,第一个线程更新SIZECTL=(rs << RESIZE_STAMP_SHIFT) + 2),最后一个线程结束并发扩容。

// 修改SIZECTL = SIZECTL -1,也就是扩容的线程数量 - 1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  // SIZECTL=(rs<<16)+2,进入方法前调整了SIZECTL为(rs<<16)+2
  // rs=resizeStamp(n),这里进行校验
    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
        return;
    finishing = advance = true;
    i = n; // recheck before commit
}

④ 三目运算符嵌套


如下所示,这里关于树化和链表化进行了三目运算符的嵌套。我们回顾一下这点知识:三目运算符是右结合的,也就是靠右侧的先结合为一个表达式。

a ? b : c ? d : e 其实等价于 a ? b : (c ? d : e)

ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
    (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
    (lc != 0) ? new TreeBin<K,V>(hi) : t;


等价于:

ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
   ( (hc != 0) ? new TreeBin<K,V>(lo) : t);
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  (  (lc != 0) ? new TreeBin<K,V>(hi) : t);


⑤ nextIndex、 transferIndex与TRANSFERINDEX

nextIndex=transferIndex进行了初始赋值,TRANSFERINDEX呢则是如下所示指向了transferIndex这个变量。可以这样理解,对TRANSFERINDEX的更新会反映到transferIndex这个变量上,其是ConcurrentHashMap的成员变量使用了volatile 修饰。

U.objectFieldOffset(k.getDeclaredField("transferIndex"))
private transient volatile int transferIndex;

在下图所示代码中,我们可以看到并发线程对nextIndex和TRANSFERINDEX的更新。假设第一个个线程处理的槽位范围是(bound,nextIndex),那么下个线程处理的槽位范围则是(bound-stride,nextIndex-stride),也就是左移了stride(步幅,最小16)。

e1ade085d5f04070b960282f12572d53.png


总结如下:


java 8 中的 ConcurrentHashMap 在扩容时, 为了尽可能的高效, 允许多个线程并发完成旧数组元素至新数组的迁移任务。 为了保证多个线程各自独立执行, 不发生冲突, 迁移任务按照一定的步长(stride), 将桶数组划分成不同的段, 由一个线程或多个线程完成。


扩容刚开始时, transferIndex 指向桶数组末尾, 执行扩容的线程需要以 CAS 方式将 transferIndex 左移(transferIndex = transferIndex - stride), 得到需要迁移的 “段”


如果再有线程需要执行扩容, 发现扩容正在进行的时候, 就读取 transferIndex, 以 CAS 方式将 transferIndex 继续左移, 获取下一个需要迁移的**“段”**


transferIndex <= 0 表示所有的迁移任务都已经分配完毕, 新的线程就不能再加入迁移施工队了


【3】扩容过程中的树化和链表化

接上文,当结点 f 是 TreeBin 时,会根据hc 、lc的大小进行链表转化或者红黑树转化。


lc 表示未发生位置变化的元素个数,

hc 表示发生了位置变化的元素个数。

lo 表示未发生位置变化的头结点

hi 表示发生了位置变化的头结点

ln 指向的是未发生位置变化的“槽位结点”

hn 指向的是发生了位置变化的“槽位结点”

ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
   (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
   (lc != 0) ? new TreeBin<K,V>(hi) : t;

① 链表化

其中链表化的代码十分简单如下所示,将所有的结点都使用普通结点Node替代,然后遍历next追加到Node<K,V> b后面形成链表。

static <K,V> Node<K,V> untreeify(Node<K,V> b) {
    Node<K,V> hd = null, tl = null;
    for (Node<K,V> q = b; q != null; q = q.next) {
        Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
        if (tl == null)
            hd = p;
        else
            tl.next = p;
        tl = p;
    }
    return hd;
}


这里我们着重研究红黑树的转化,与HashMap不同的是,这里交给了TreeBin来处理。

② 红黑树化TreeBin

其实我们要研究的也就是new TreeBin<K,V>(hi)做了什么,这里 hiTreeNode


TreeBin内部持有了 first 和 root 结点。如下代码所示,其针对当前结点TreeNode<K,V> b的所有next进行了处理,为每一个结点找到树上一个合适的位置。每一次放到树上的最后都会触发平衡插入balanceInsertion(r, x)来保持红黑树的性质。 这个过程和HashMap的treeify方法十分类似。

TreeBin(TreeNode<K,V> b) {
  // TREEBIN = -2 ,也就是 hash = -2
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    //从结点 b , 一路遍历 next 下去,处理每一个结点
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
      // 记录x.next 
        next = (TreeNode<K,V>)x.next;
        // 当前结点left right置为null
        x.left = x.right = null;
        //如果root为null,那么x 作为root
        if (r == null) {
            x.parent = null;
            x.red = false;//根结点一定是黑色-红黑树性质
            r = x;
        }
        else {
        // 记录 key hash
            K k = x.key;
            int h = x.hash;
            Class<?> kc = null;
            // 从root结点开始往下遍历,为当前结点 X 找到合适的位置
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                    TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                     // 和HashMap一样,平衡插入,获取root   
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);
}


关于结点遍历和树处理的过程这里不再赘述


③ TreeBin的核心属性和方法

我们顺带看一下TreeBin的核心属性和方法。如下所示前面我们分析过first和root结点,此外还有一些线程控制与锁计数。

TreeNode<K,V> root;
volatile TreeNode<K,V> first;
// 当前等待写锁的线程
volatile Thread waiter;
// 记录锁状态 0 表示无锁
volatile int lockState;
// values for lockState lockState的一些值
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock


lockState用于记录锁状态, 当其为 0 时,表示没有锁。

加锁

putTreeValremoveTreeNode方法会触发lockRoot(),这个方法会尝试将LOCKSTATE的值从 0 更新为WRITER(2),表示有线程持有当前结点的写锁。


private final void lockRoot() {
    if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
        contendedLock(); // offload to separate method
}

如果if条件不成功,那么会触发contendedLock()方法,这个方法将会通过自旋更新 LOCKSTATE 的值 为 WRITER 以达到持有写锁的目的。

private final void contendedLock() {
    boolean waiting = false;
    for (int s;;) {
    // ~WAITER = - (WAITER+1),也就是与 1,0011 进行& == 0
    //如果当前线程持有的当前结点 lockState=READER 也就是4,那么不会进入这里分支
        if (((s = lockState) & ~WAITER) == 0) {
            if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
                if (waiting)
                    waiter = null;
                return;
            }
        }
        // 也就是与 0010 进行& == 0 时,进入if,比如0 1 4
        else if ((s & WAITER) == 0) {
         // 当前线程进行等待写锁
            if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
                waiting = true;
                waiter = Thread.currentThread();
            }
        }
        else if (waiting)
            LockSupport.park(this);
    }
}


解锁

解锁也就是释放当前结点的锁,直接将 lockState 修改为 0 。

private final void unlockRoot() {
    lockState = 0;
}


关于~

以int a =10为例,求得 ~a


正数的原码 = 反码 = 补码

负数的反码 = 原码符号位不变,其它位全取反,负数的补码 = 反码 + 1。


~表示非运算符,就是将该数的所有二进制位全取反。但又由于计算机中是以补码的形式存储的,所以0 1010全取反是1 0101(只是补码形式,还需要转成原码)。

此时得到的1 0101只是补码,我们需要将它先转为反码,反码 = 补码-1,得到反码为1 0100。

我们得到反码后,将它转为原码,原码 = 反码符号位不变,其它位全取反,得到最终的原码为1 1011,转化为十进制就是-11。

目录
相关文章
|
4月前
|
存储 安全 Java
【JDK 源码分析】ConcurrentHashMap 底层结构
【1月更文挑战第27天】【JDK 源码分析】ConcurrentHashMap 底层结构
|
3月前
|
Oracle Java 编译器
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)
47 1
|
2月前
|
算法 Java 索引
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
【数据结构与算法】4、双向链表(学习 jdk 的 LinkedList 部分源码)
34 0
|
4月前
|
Oracle IDE Java
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)(下)
基本概念【入门、 发展简史、核心优势、各版本的含义、特性和优势、JVM、JRE 和 JDK 】(二)-全面详解(学习总结---从入门到深化)
37 1
|
5月前
|
安全 Java 索引
认真学习jdk1.7下ConcurrentHashMap的实现原理
认真学习jdk1.7下ConcurrentHashMap的实现原理
60 0
|
5月前
|
机器学习/深度学习 存储 Java
认真学习jdk1.8下ConcurrentHashMap的实现原理
认真学习jdk1.8下ConcurrentHashMap的实现原理
33 0
认真学习jdk1.8下ConcurrentHashMap的实现原理
|
8月前
|
Java
从零开始学习 Java:简单易懂的入门指南之JDK8时间相关类(十八)
从零开始学习 Java:简单易懂的入门指南之JDK8时间相关类(十八)
|
10月前
|
安全 Java 容器
JDK 7 ConcurrentHashMap
JDK 7 ConcurrentHashMap
|
10月前
|
安全 Java
JDK 8 ConcurrentHashMap
JDK 8 ConcurrentHashMap
|
3天前
|
弹性计算 运维 Java
一键安装二进制JDK
【4月更文挑战第30天】
6 0