【Java面试】ConcurrentHashMap再JDK7和8中的区别以及ConcurrentHashMap底层实现(一)

简介: 【Java面试】ConcurrentHashMap再JDK7和8中的区别以及ConcurrentHashMap底层实现

如果还不了解ConcurrentHashMap的可以看: ConcurrentHashMap概述

ConcurrentHashMap在jdk1.7中的设计

再JDK7中,ConcurrentHashMap使用的是segments+table+链表的结构。

其中对每一个segment进行加锁,那么只要访问的是不同的segment,就可以实现并发访问hashmap的能力了。

每一个segment都是一个HashEntry<K,V>[] table, table中的每一个元素本质上都是一个HashEntry的

单向队列。比如table[3]为首节点,table[3]->next为节点1,之后为节点2,依次类推。

如果不懂volatile 关键字的作用,可以看:volatile关键字作用

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
  // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目
  的是对于put, remove等操作,可以减少并发冲突,对
  // 不属于同一个片段的节点可以并发操作,大大提高了性能
  final Segment<K,V>[] segments;
  // 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了
  ReentrantLock, 可以作为互拆锁使用
  static final class Segment<K,V> extends ReentrantLock implements Serializable
  {
  transient volatile HashEntry<K,V>[] table;
  transient int count;
  }
  // 基本节点,存储Key, Value值
  static final class HashEntry<K,V> {
  final int hash;
  final K key;
  volatile V value;
  volatile HashEntry<K,V> next;
  }
}

在jdk1.8中做的改进

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用

table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于

hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均

匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然

ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存

在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

为了说明以上2个改动,看一下put操作是如何实现的。

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新
建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成
后的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在链表中找到值为key的节点e,直接设置e.val = value即
可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果没有找到值为key的节点,直接新建Node并加入链表即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果节点数>=8,那么转换链表结构为红黑树结构。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数增加1,有可能触发transfer操作(扩容)。
addCount(1L, binCount);
return null;
}

另外,在其他方面也有一些小的改进,比如新增字段 transient volatile CounterCell[] counterCells; 可

方便的计算hashmap中所有元素的个数,性能大大优于jdk1.7中的size()方法。

ConcurrentHashMap jdk1.7、jdk1.8性能比较

public class CompareConcurrentHashMap {
private static ConcurrentHashMap<String, Integer> map = new
ConcurrentHashMap<String, Integer>(40000);
public static void putPerformance(int index, int num) {
for (int i = index; i < (num + index) ; i++)
map.put(String.valueOf(i), i);
}
public static void getPerformance2() {
long start = System.currentTimeMillis();
for (int i = 0; i < 400000; i++)
map.get(String.valueOf(i));
long end = System.currentTimeMillis();
System.out.println("get: it costs " + (end - start) + " ms");
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final CountDownLatch cdLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
final int finalI = i;
new Thread(new Runnable() {
public void run() {
CompareConcurrentHashMap.putPerformance(100000 * finalI,
100000);
cdLatch.countDown();
}
}).start();
}
cdLatch.await();
long end = System.currentTimeMillis();
System.out.println("put: it costs " + (end - start) + " ms");
CompareConcurrentHashMap.getPerformance2();
}
}

程序运行多次后取平均值,结果如下:

谈谈ConcurrentHashMap1.7和1.8的不同实现

在多线程环境下,使用 HashMap 进行 put 操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈建议使用 ConcurrentHashMap 代替 HashMap ,为了对更深入的了解,本文将对JDK1.7和1.8的不同实现进行分析。

JDK1.7

数据结构

jdk1.7中采用 Segment + HashEntry 的方式进行实现,结构如下:

ConcurrentHashMap 初始化时,计算出 Segment 数组的大小 ssize 和每个 Segment 中 HashEntry 数

组的大小 cap ,并初始化 Segment 数组的第一个元素;其中 ssize 大小为2的幂次方,默认为16, cap大小也是2的幂次方,最小值为2,最终结果根据根据初始化容量 initialCapacity 进行计算,计算过

程如下:

if (c * ssize < initialCapacity)
  ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
  cap <<= 1;

其中 Segment 在实现上继承了 ReentrantLock ,这样就自带了锁的功能。

put实现

当执行 put 方法插入数据时,根据key的hash值,在 Segment 数组中找到相应的位置,如果相应位置的Segment 还未初始化,则通过CAS(Compare and Swap,是一个原子操作)进行赋值,接着执行 Segment 对象的 put 方法通过加锁机制插入数据,实现如下:

场景:线程A和线程B同时执行相同 Segment 对象的 put 方法

1、线程A执行 tryLock() 方法成功获取锁,则把 HashEntry 对象插入到相应的位置;

2、线程B获取锁失败,则执行 scanAndLockForPut() 方法,在 scanAndLockForPut 方法中,会通过

重复执行 tryLock() 方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当

执行 tryLock() 方法的次数超过上限时,则执行 lock() 方法挂起线程B;

3、当线程A执行完插入操作时,会通过 unlock() 方法释放锁,接着唤醒线程B继续执行;

size实现

因为 ConcurrentHashMap 是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个 Segment 对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个 Segment 的元素个数时,已经计算过的 Segment 同时可能有数据的插入或者删除,在1.7的实现中,采用了如下方式:

try {
  for (;;) {
  if (retries++ == RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
      ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
      sum += seg.modCount;
      int c = seg.count;
      if (c < 0 || (size += c) < 0)
        overflow = true;
      }
    }
  if (sum == last)
    break;
    last = sum;
  }
} finally {
  if (retries > RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    segmentAt(segments, j).unlock();
    }
  }

先采用不加锁的方式,连续计算元素的个数,最多计算3次:

1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;

2、如果前后两次计算结果都不同,则给每个 Segment 进行加锁,再计算一次元素的个数;

JDK1.8

数据结构

1.8中放弃了 Segment 臃肿的设计,取而代之的是采用 Node + CAS + Synchronized (JDK1.6之后优化了锁机制)来保证并发安全进行实现,结构如下:

只有在执行第一次 put 方法时才会调用 initTable() 初始化 Node 数组,实现如下:

private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
      Thread.yield(); // lost initialization race; just spin
      else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    try {
      if ((tab = table) == null || tab.length == 0) {
      int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
      @SuppressWarnings("unchecked")
      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
      table = tab = nt;
      sc = n - (n >>> 2);
      }
    } finally {
      sizeCtl = sc;
    }
    break;
    }
  }
  return tab;
}

put实现

当执行 put 方法插入数据时,根据key的hash值,在 Node 数组中找到相应的位置,实现如下:

1、如果相应位置的 Node 还未初始化,则通过CAS插入相应的数据;

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
}

2、如果相应位置的 Node 不为空,且当前该节点不处于移动状态,则对该节点加 synchronized 锁,如果该节点的 hash 不小于0,则遍历链表更新节点或插入新节点;

if (fh >= 0) {
  binCount = 1;
  for (Node<K,V> e = f;; ++binCount) {
    K ek;
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) {
    oldVal = e.val;
    if (!onlyIfAbsent)
    e.val = value;
    break;
  }
  Node<K,V> pred = e;
    if ((e = e.next) == null) {
      pred.next = new Node<K,V>(hash, key, value, null);
      break;
    }
  }
}


相关文章
|
6天前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
11天前
|
存储 缓存 Oracle
Java I/O流面试之道
NIO的出现在于提高IO的速度,它相比传统的输入/输出流速度更快。NIO通过管道Channel和缓冲器Buffer来处理数据,可以把管道当成一个矿藏,缓冲器就是矿藏里的卡车。程序通过管道里的缓冲器进行数据交互,而不直接处理数据。程序要么从缓冲器获取数据,要么输入数据到缓冲器。
Java I/O流面试之道
|
1天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
9 2
|
7天前
|
存储 缓存 Java
大厂面试必看!Java基本数据类型和包装类的那些坑
本文介绍了Java中的基本数据类型和包装类,包括整数类型、浮点数类型、字符类型和布尔类型。详细讲解了每种类型的特性和应用场景,并探讨了包装类的引入原因、装箱与拆箱机制以及缓存机制。最后总结了面试中常见的相关考点,帮助读者更好地理解和应对面试中的问题。
26 4
|
8天前
|
存储 Java 程序员
Java基础的灵魂——Object类方法详解(社招面试不踩坑)
本文介绍了Java中`Object`类的几个重要方法,包括`toString`、`equals`、`hashCode`、`finalize`、`clone`、`getClass`、`notify`和`wait`。这些方法是面试中的常考点,掌握它们有助于理解Java对象的行为和实现多线程编程。作者通过具体示例和应用场景,详细解析了每个方法的作用和重写技巧,帮助读者更好地应对面试和技术开发。
41 4
|
3月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
1月前
|
算法 Java 数据中心
探讨面试常见问题雪花算法、时钟回拨问题,java中优雅的实现方式
【10月更文挑战第2天】在大数据量系统中,分布式ID生成是一个关键问题。为了保证在分布式环境下生成的ID唯一、有序且高效,业界提出了多种解决方案,其中雪花算法(Snowflake Algorithm)是一种广泛应用的分布式ID生成算法。本文将详细介绍雪花算法的原理、实现及其处理时钟回拨问题的方法,并提供Java代码示例。
66 2
|
1月前
|
JSON 安全 前端开发
第二次面试总结 - 宏汉科技 - Java后端开发
本文是作者对宏汉科技Java后端开发岗位的第二次面试总结,面试结果不理想,主要原因是Java基础知识掌握不牢固,文章详细列出了面试中被问到的技术问题及答案,包括字符串相关函数、抽象类与接口的区别、Java创建线程池的方式、回调函数、函数式接口、反射以及Java中的集合等。
27 0
|
3月前
|
XML 存储 JSON
【IO面试题 六】、 除了Java自带的序列化之外,你还了解哪些序列化工具?
除了Java自带的序列化,常见的序列化工具还包括JSON(如jackson、gson、fastjson)、Protobuf、Thrift和Avro,各具特点,适用于不同的应用场景和性能需求。
|
3月前
|
Java
【Java基础面试三十七】、说一说Java的异常机制
这篇文章介绍了Java异常机制的三个主要方面:异常处理(使用try、catch、finally语句)、抛出异常(使用throw和throws关键字)、以及异常跟踪栈(异常传播和程序终止时的栈信息输出)。