【Java面试】ConcurrentHashMap再JDK7和8中的区别以及ConcurrentHashMap底层实现(一)

简介: 【Java面试】ConcurrentHashMap再JDK7和8中的区别以及ConcurrentHashMap底层实现

如果还不了解ConcurrentHashMap的可以看: ConcurrentHashMap概述

ConcurrentHashMap在jdk1.7中的设计

再JDK7中,ConcurrentHashMap使用的是segments+table+链表的结构。

其中对每一个segment进行加锁,那么只要访问的是不同的segment,就可以实现并发访问hashmap的能力了。

每一个segment都是一个HashEntry<K,V>[] table, table中的每一个元素本质上都是一个HashEntry的

单向队列。比如table[3]为首节点,table[3]->next为节点1,之后为节点2,依次类推。

如果不懂volatile 关键字的作用,可以看:volatile关键字作用

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
  // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目
  的是对于put, remove等操作,可以减少并发冲突,对
  // 不属于同一个片段的节点可以并发操作,大大提高了性能
  final Segment<K,V>[] segments;
  // 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了
  ReentrantLock, 可以作为互拆锁使用
  static final class Segment<K,V> extends ReentrantLock implements Serializable
  {
  transient volatile HashEntry<K,V>[] table;
  transient int count;
  }
  // 基本节点,存储Key, Value值
  static final class HashEntry<K,V> {
  final int hash;
  final K key;
  volatile V value;
  volatile HashEntry<K,V> next;
  }
}

在jdk1.8中做的改进

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用

table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于

hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均

匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然

ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存

在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

为了说明以上2个改动,看一下put操作是如何实现的。

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新
建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成
后的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在链表中找到值为key的节点e,直接设置e.val = value即
可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果没有找到值为key的节点,直接新建Node并加入链表即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果节点数>=8,那么转换链表结构为红黑树结构。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数增加1,有可能触发transfer操作(扩容)。
addCount(1L, binCount);
return null;
}

另外,在其他方面也有一些小的改进,比如新增字段 transient volatile CounterCell[] counterCells; 可

方便的计算hashmap中所有元素的个数,性能大大优于jdk1.7中的size()方法。

ConcurrentHashMap jdk1.7、jdk1.8性能比较

public class CompareConcurrentHashMap {
private static ConcurrentHashMap<String, Integer> map = new
ConcurrentHashMap<String, Integer>(40000);
public static void putPerformance(int index, int num) {
for (int i = index; i < (num + index) ; i++)
map.put(String.valueOf(i), i);
}
public static void getPerformance2() {
long start = System.currentTimeMillis();
for (int i = 0; i < 400000; i++)
map.get(String.valueOf(i));
long end = System.currentTimeMillis();
System.out.println("get: it costs " + (end - start) + " ms");
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final CountDownLatch cdLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
final int finalI = i;
new Thread(new Runnable() {
public void run() {
CompareConcurrentHashMap.putPerformance(100000 * finalI,
100000);
cdLatch.countDown();
}
}).start();
}
cdLatch.await();
long end = System.currentTimeMillis();
System.out.println("put: it costs " + (end - start) + " ms");
CompareConcurrentHashMap.getPerformance2();
}
}

程序运行多次后取平均值,结果如下:

谈谈ConcurrentHashMap1.7和1.8的不同实现

在多线程环境下,使用 HashMap 进行 put 操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈建议使用 ConcurrentHashMap 代替 HashMap ,为了对更深入的了解,本文将对JDK1.7和1.8的不同实现进行分析。

JDK1.7

数据结构

jdk1.7中采用 Segment + HashEntry 的方式进行实现,结构如下:

ConcurrentHashMap 初始化时,计算出 Segment 数组的大小 ssize 和每个 Segment 中 HashEntry 数

组的大小 cap ,并初始化 Segment 数组的第一个元素;其中 ssize 大小为2的幂次方,默认为16, cap大小也是2的幂次方,最小值为2,最终结果根据根据初始化容量 initialCapacity 进行计算,计算过

程如下:

if (c * ssize < initialCapacity)
  ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
  cap <<= 1;

其中 Segment 在实现上继承了 ReentrantLock ,这样就自带了锁的功能。

put实现

当执行 put 方法插入数据时,根据key的hash值,在 Segment 数组中找到相应的位置,如果相应位置的Segment 还未初始化,则通过CAS(Compare and Swap,是一个原子操作)进行赋值,接着执行 Segment 对象的 put 方法通过加锁机制插入数据,实现如下:

场景:线程A和线程B同时执行相同 Segment 对象的 put 方法

1、线程A执行 tryLock() 方法成功获取锁,则把 HashEntry 对象插入到相应的位置;

2、线程B获取锁失败,则执行 scanAndLockForPut() 方法,在 scanAndLockForPut 方法中,会通过

重复执行 tryLock() 方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当

执行 tryLock() 方法的次数超过上限时,则执行 lock() 方法挂起线程B;

3、当线程A执行完插入操作时,会通过 unlock() 方法释放锁,接着唤醒线程B继续执行;

size实现

因为 ConcurrentHashMap 是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个 Segment 对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个 Segment 的元素个数时,已经计算过的 Segment 同时可能有数据的插入或者删除,在1.7的实现中,采用了如下方式:

try {
  for (;;) {
  if (retries++ == RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
      ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
      sum += seg.modCount;
      int c = seg.count;
      if (c < 0 || (size += c) < 0)
        overflow = true;
      }
    }
  if (sum == last)
    break;
    last = sum;
  }
} finally {
  if (retries > RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    segmentAt(segments, j).unlock();
    }
  }

先采用不加锁的方式,连续计算元素的个数,最多计算3次:

1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;

2、如果前后两次计算结果都不同,则给每个 Segment 进行加锁,再计算一次元素的个数;

JDK1.8

数据结构

1.8中放弃了 Segment 臃肿的设计,取而代之的是采用 Node + CAS + Synchronized (JDK1.6之后优化了锁机制)来保证并发安全进行实现,结构如下:

只有在执行第一次 put 方法时才会调用 initTable() 初始化 Node 数组,实现如下:

private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
      Thread.yield(); // lost initialization race; just spin
      else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    try {
      if ((tab = table) == null || tab.length == 0) {
      int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
      @SuppressWarnings("unchecked")
      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
      table = tab = nt;
      sc = n - (n >>> 2);
      }
    } finally {
      sizeCtl = sc;
    }
    break;
    }
  }
  return tab;
}

put实现

当执行 put 方法插入数据时,根据key的hash值,在 Node 数组中找到相应的位置,实现如下:

1、如果相应位置的 Node 还未初始化,则通过CAS插入相应的数据;

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
}

2、如果相应位置的 Node 不为空,且当前该节点不处于移动状态,则对该节点加 synchronized 锁,如果该节点的 hash 不小于0,则遍历链表更新节点或插入新节点;

if (fh >= 0) {
  binCount = 1;
  for (Node<K,V> e = f;; ++binCount) {
    K ek;
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) {
    oldVal = e.val;
    if (!onlyIfAbsent)
    e.val = value;
    break;
  }
  Node<K,V> pred = e;
    if ((e = e.next) == null) {
      pred.next = new Node<K,V>(hash, key, value, null);
      break;
    }
  }
}


相关文章
|
8天前
|
Java
Java面向对象实践小结(含面试题)(下)
Java面向对象实践小结(含面试题)(下)
20 1
|
9天前
|
安全 Java
【JAVA】在 Queue 中 poll()和 remove()有什么区别
【JAVA】在 Queue 中 poll()和 remove()有什么区别
|
2天前
|
Oracle Java 关系型数据库
windows 下 win11 JDK17安装与环境变量的配置(配置简单详细,包含IJ中java文件如何使用命令运行)
本文介绍了Windows 11中安装JDK 17的步骤,包括从官方网站下载JDK、配置环境变量以及验证安装是否成功。首先,下载JDK 17的安装文件,如果没有Oracle账户,可以直接解压缩文件到指定目录。接着,配置系统环境变量,新建`JAVA_HOME`变量指向JDK安装路径,并在`Path`变量中添加。然后,通过命令行(cmd)验证安装,分别输入`java -version`和`javac -version`检查版本信息。最后,作者分享了如何在任意位置运行Java代码,包括在IntelliJ IDEA(IJ)中创建的Java文件,只需去掉包声明,就可以通过命令行直接运行。
19 0
|
6天前
|
Java
Java中 a+=b和a=a+b有什么区别?
Java中 a+=b和a=a+b有什么区别?
|
6天前
|
Java 开发者
Java中三种Set的实现类的用法和区别
Java中三种Set的实现类的用法和区别
|
7天前
|
小程序 Java 程序员
【Java探索之旅】我与Java的初相识(二):程序结构与运行关系和JDK,JRE,JVM的关系
【Java探索之旅】我与Java的初相识(二):程序结构与运行关系和JDK,JRE,JVM的关系
19 0
|
8天前
|
存储 缓存 开发框架
Java基础面试题小结(下)
Java基础面试题小结(下)
16 0
|
8天前
|
存储 安全 Java
Java基础面试题小结(上)
Java基础面试题小结(上)
15 0
|
8天前
|
设计模式 Java
Java面向对象实践小结(含面试题)(上)
Java面向对象实践小结(含面试题)
13 1
|
8天前
|
Java Windows
java——安装JDK及配置解决常见问题
java——安装JDK及配置解决常见问题