2.5 TreeMap
TreeMap是基于红黑树实现的key-value集合,它的元素是有有序的。
它的内部数据结构如下:
// 比较器对象 private final Comparator<? super K> comparator; // 根节点 private transient Entry<K,V> root; // 集合大小 private transient int size = 0; // 树结构被修改的次数 private transient int modCount = 0; // 静态内部类用来表示节点类型 static final class Entry<K,V> implements Map.Entry<K,V> { K key; V value; Entry<K,V> left; // 指向左子树的引用(指针) Entry<K,V> right; // 指向右子树的引用(指针) Entry<K,V> parent; // 指向父节点的引用(指针) boolean color = BLACK; // 红黑flag }
2.6 ConcurrentHashMap
ConcurrentHashMap是线程安全的HashMap,在Java 1.7中使用Segment分段锁实现。下面是ConcurrentHashMap在Java 1.7中的数据结构:
p ublic class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对 // 不属于同一个片段的节点可以并发操作,大大提高了性能 final Segment<K,V>[] segments; // 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了ReentrantLock, 可以作为互拆锁使用 static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; } // 基本节点,存储Key, Value值 static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; } }
Java 1.7中,ConcurrentHashMap将数据分段存储,一个ConcurrentHashMap由多个Segment组成,每个Segment都有把锁,同时一个Segment下包含许多Node。一个Segment本质上就是一个继承了ReentrantLock的小的HashMap,因此锁粒度是以Segment为单位的,即以Hash桶的每个位置为单位进行锁操作。这样处于不同Hash段的元素可以并发操作。与HashTable相比,大大提高了性能。
在Java 1.8中,使用CAS操作对ConcurrentHashMap的实现进行了优化。其主要数据结构如下:
transient volatile Node<K,V>[] table; //存储Node private transient volatile Node<K,V>[] nextTable; //扩容时存放数据,大小为table的2倍 private transient volatile int sizeCtl; //控制标志,具有多种用途 static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; }
sizeCtl变量是一个用于同步多个线程的共享变量。当sizeCtl < 0时,表明当前HashMap正在被初始化(-1表示正在初始化)或者正在扩容中(-N表示有N-1个线程正在参与扩容)。如果为正数,则代表了需要扩容时的阀值(即capcity * 0.75)。
下面时HashMap初始化的源码:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
我们可以看到,ConcurrentHashMap通过CAS支持了多个线程同步操作,初始化步骤如下:
1. 如果当前table为null或table.length==0,开始进行初始化。
2. 读取sizeCtl变量,如果小于0,则表明此HashMap正在进行初始化,此时调用Thread.yield()方法让出时间片。
3. 如果sizeCtl变量不小于0,那么通过CAS操作将sizeCtl置为-1,CAS操作成功后,对table数组进行初始化,并将sizeCtl设置为Capcity * 0.75,即下次扩容触发的阀值。
4. 此时其他希望进行初始化的线程读取到table不为null了,即初始化完毕。
put方法主要流程如下:
1. 首先通过(n - 1) & hash
获得下标位置,如果该位置为空,则使用CAS操作直接插入。
2. 如果该位置不为空,且该位置节点的hash值为-1,则代表该链表正在处于扩容阶段,此时放弃插入,直接调用helpTransfer(Node<K,V>[] tab, Node<K,V> f)
方法帮助扩容。
3. 否则,对该位置节点加锁(Synchronized),执行add操作。
4. 如果链表长度超过8个,则调用treeifyBin(Node<K,V>[] tab, int index)
方法将链表转化为红黑树。
扩容时ConcurrentHashMap会将一个ForwardingNode(hash值为-1)放置在原Node位置。这个ForwardingNode存储了
Node<K,V>[] nextTable
的引用。
treeifyBin
方法并不一定会将链表转化为红黑树。如果当前table.length < 64(MIN_TREEIFY_CAPACITY),此时调用tryPresize(n << 1)
方法,对table进行扩容。如果table.length >= 64,此时才会进行红黑树转化操作。在
tryPresize(int size)
方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助其扩容。扩容的核心逻辑位于transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)
方法中。
默认情况下,每个线程处理 16 个桶。因此,如果table长度是 16 的时候,扩容的时候只会有一个线程扩容。如果table长度是 64 ,每个线程可以分到 16 个桶,各自处理,不会互相影响。
concurrenthashmap.jpg
下面时get方法的主要代码:
public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return n ull; }
get方法主要流程如下:
1. 首先计算出记录的key的hashCode,然后通过(n - 1) & hash
获得下标位置,如果该位置为null,则直接返回。
2. 如果该位置不为null,并且key与我们先要查找的key相等,则直接返回该位置节点的值。
3. 如果该节点的是TreeNode,则说明该位置上是一颗红黑树,则调用红黑树搜索逻辑返回节点值。
4. 否则说明该节点是一个链表,遍历链表,找到要查找的key对应节点,返回该节点的值。
计算Map大小的size方法实现如下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } 我们看到,size()方法调用了sumCount()方法返回Map的大小。下面是sumCount()方法的实现: long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
首先ConcurrentHashMap中维护了一个baseCount
变量用于记录size,每一次put新的元素时,ConcurrentHashMap都会通过addCount()
方法CAS更新baseCount
变量。然而在高并发情况下,很有可能出现put成功,而CAS更新baseCount
失败,那么这些节点虽然已经被添加到哈希表中了,但是数量却没有被统计。
因此,addCount()
方法在更新 baseCount
失败的时候,会调用fullAddCount()
方法将这些失败的结点包装成一个CounterCell
对象,并保存在private transient volatile CounterCell[] counterCells;
数组中。那么整张表实际的 size 其实是baseCount
加上CounterCell
数组中元素的值之和。
更过关于ConcurrentHashMap的源码分析,请参考:
- https://www.cnblogs.com/zerotomax/p/8687425.html#go7
- https://juejin.im/entry/59fc786d518825297f3fa968
- https://juejin.im/post/5b00160151882565bd2582e0
3. Java并发编程
3.1 Java对象头
Java对象头由8字节组成(数组对象则有12字节)。主要包含三部分:
1. Mark Word:4字节。存储hashcode或锁信息。
2. Class Metadata Address: 4字节。存储了指向对象数据类型的指针。
3. Array Length(数组对象才包含此部分):存储数组长度。
对象头.jpg
3.2 Synchronized关键字
synchronized锁升级过程
synchronized锁升级过程
synchronized锁流程如下:
1. 检查MarkWord里面是不是自己的ThreadId ,如果是,表示当前线程是处于 “偏向锁”。
2. 如果MarkWord不是自己的ThreadId,锁升级,这时候,用CAS来执行切换,新的线程根据MarkWord里面现有的ThreadId,通知之前线程暂停,之前线程将Markword的内容置为空。
3. 两个线程都把对象的HashCode复制到自己新建的用于存储锁的记录空间,接着开始通过CAS操作,把共享对象的MarKword的内容修改为自己新建的记录空间的地址的方式竞争MarkWord。
4. 第三步中成功执行CAS的获得资源,失败的则进入自旋。
5. 自旋的线程在自旋过程中,成功获得资源(即之前获的资源的线程执行完成并释放了共享资源),则整个状态依然处于 轻量级锁的状态,如果自旋失败。
6. 进入重量级锁的状态,这个时候,自旋的线程进行阻塞状态,等待前一个线程释放资源。
3.3 线程状态转化
Java线程状态图.JPG
Java中的线程状态主要有6种:
1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
线程对象调用start()方法后,该线程就处于就绪(ready)状态。此时它等待被线程调度选中,获取CPU的使用权。就绪(ready)状态的线程在获得CPU时间片后才真正变为运行中状态(running)。
3. 阻塞(BLOCKED):表示线程由于无法获得锁而处于阻塞状态。此时所有被阻塞的线程处于一个同步队列中。
4. 等待(WAITING):线程同样处于阻塞状态,然而此时被阻塞的线程处于等待队列中。正在运行的线程调用Object.wait(),Thread.join()方法后,线程就会处于等待状态。
5. 超时等待(TIMED_WAITING):可以在指定的时间后自行返回的等待状态。
6. 终止(TERMINATED):表示该线程已经执行完毕。
Java中常用线程状态转换API的区别:
1. Thread.sleep(long millis):当前线程进入TIMED_WAITING状态,但不释放对象锁,millis后线程自动苏醒进入就绪状态。
2. Thread.yield():当前线程让出CPU时间片,但不释放锁资源,由运行状态变为就绪状态,此时由OS来分配此线程让出的时间片(可能依旧分配给此线程)。
3. thread.join()/thread.join(long millis):当前线程里调用其它线程t的join方法,当前线程进入WAITING/TIMED_WAITING状态,当前线程不会释放已经持有的对象锁。线程t执行完毕或者millis时间到,当前线程一般情况下进入RUNNABLE状态,也有可能进入BLOCKED状态(因为join是基于wait实现的)。
4. object.wait():当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout) timeout时间到自动唤醒。
5. LockSupport.park()/LockSupport.parkNanos(long nanos),LockSupport.parkUntil(long deadlines),:当前线程进入WAITING/TIMED_WAITING状态。对比wait方法,不需要获得锁就可以让线程进入WAITING/TIMED_WAITING状态,需要通过LockSupport.unpark(Thread thread)唤醒。
3.4 Lock
Java中的Lock接口提供了与synchronized关键字类似的功能,只是在使用时需要显示的获取和释放锁。下面是Lock接口的主要方法:
public interface Lock { void lock(); //获取锁 void lockInterruptibly() throws InterruptedException; //可中断的获取锁;此方法会被Thread.interrupt()方法中断 boolean tryLock(); //非阻塞的获取锁;此方法会立刻返回,true表示成功获取了锁 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //在指定时间内获取锁 void unlock(); //释放锁 //构造与Lock关联的condition,用于实现等待唤醒 Condition newCondition();
虽然Lock接口缺少了(synchronized块或方法所提供的)隐式获取和释放锁的便捷性,但是却拥有了锁获取与释放的可操作性,可中断的获取锁以及超时获取锁等synchronized关键字所不具备的同步特性。
常用的Lock接口实现类包括ReentrantLock(可重入锁),ReentrantReadWriteLock.ReadLock和ReentrantReadWriteLock.WriteLock。
3.5 AbstractQueuedSynchronizer
队列同步器AbstractQueuedSynchronizer,使用来构建锁或者其他同步组件的基础框架。Java中Lock的实现类都是通过聚合一个重写的AbstractQueuedSynchronizer来实现的。
AbstractQueuedSynchronizer中常用的可重写的方法如下:
protected boolean tryAcquire(int arg):独占式的获取同步状态。实现该方法需要判断同步状态是否符合预期,然后进行CAS设置同步状态。 protected boolean tryRelease(int arg):独占式释放同步状态。等待获取同步状态的线程将有机会获取同步状态。 protected boolean tryAcquireShared(int arg):共享式获取同步状态。返回大于0的值,表示获取成功,反之,获取失败。 protected boolean tryReleaseShared(int arg):共享式释放同步状态 protected boolean isHeldExclusively():是否被当前线程独占
同步器提供的方法主要分为3类:独占式获取和释放同步状态,共享式获取和释放同步状态,查询同步队列中的等待线程情况。通过重写这些方法,就能实现一个同步组件。下面是ReentrantLock的实现:
public class ReentrantLock implements Lock { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //omit... } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.lock(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } //omit... }
ReentrantLock内部通过NonfairSync,FairSync重写了AbstractQueuedSynchronizer,从而实现了公平可重入锁和非公平可重入锁。
重入锁(ReentrantLock),就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁还支持获取锁时的公平和非公平性选择。如果在绝对时间上,先对锁进行请求的锁先被满足,那么这个锁就是公平的,反之,就是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获得锁,也可以说锁的获取是顺序的。事实上,公平锁往往没有非公平锁的效率高(公平锁造成大量的线程切换开销),但是公平锁能够减少“饥饿”发生的概率。非公平锁虽然可能造成线程饥饿,但极少的线程切换,保证了其更大的吞吐量。
下面是AbstractQueuedSynchronizer内部的主要数据结构:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer { private transient volatile Node head; private transient volatile Node tail; private volatile int state; //omit... static final class Node { volatile Node prev; volatile Node next; volatile int waitStatus; volatile Thread thread; //omit... } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } //独占式获取锁 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //独占式释放锁 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //共享式获取锁 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); // 执行获取锁失败的逻辑 } //共享式释放锁 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); // 执行释放锁 return true; } return false; } //独占式获取锁方法,应通过子类重写 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //独占式释放锁方法,应通过子类重写 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享式获取锁方法,应通过子类重写 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //共享式释放锁方法,应通过子类重写 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } }
AbstractQueuedSynchronizer的主要数据结构有:
1. volatile变量state,来进行多个线程间的数据共享,客户端通过boolean compareAndSetState(int expect, int update)
来控制boolean tryAcquire(int arg)
,tryAcquireShared(int arg)
等行为。
2. 内部维护了一个等待队列,通过volatile Node head
和volatile Node tail
头尾指针来访问。
AbstractQueuedSynchronizer还提供了一些模板方法和抽象方法,来屏蔽底层的线程排队和唤醒等待等操作。用户仅仅需要重写boolean tryAcquire(int arg)
,boolean tryRelease(int arg)
或tryAcquireShared(int arg)
,boolean tryReleaseShared(int arg)
就可以实现一个排他锁或共享锁。
下面是独占锁获取方法void acquire(int arg)
的主要实现:
1. 通过 tryAcquire(int arg) 方法尝试获取锁,这个方法需要实现类进行实现。返回值为true则方法直接结束,返回值为false则执行后面加入等待队列的逻辑。
**
- 如果tryAcquire(int arg) 方法返回false,则执行 addWaiter(Node.EXCLUSIVE) 方法将当前线程封装成一个 Node 节点对象,并加入队列尾部。**
3. 把当前线程执行封装成 Node 节点后,继续执行 acquireQueued 的逻辑。acquireQueued方法判断该节点的前置节点是否为head,如果是head,则重新调用 tryAcquire(int arg) 方法尝试获取锁。如果前置节点不为head或者它尝试获取锁失败,则通过LockSupport.park(this)
方法阻塞当前线程,从而实现线程等待。
如果当前节点的前置节点为head,那么很有可能head节点已经释放锁了。因此不直接进入等待队列,而是再调用一次tryAcquire(int arg) 尝试获取锁,获取失败再进入等待队列。
而独占锁释放方法boolean release(int arg)
的逻辑就比较简单了:
1. 首先调用boolean tryRelease(int arg)
方法,这个方法需要实现类进行实现。
2. 如果boolean tryRelease(int arg)
方法返回true,则通过unparkSuccessor(h)
开始唤醒等待队列中下一个节点。
下面是共享锁获取方法void acquireShared(int arg)
的实现:
1. 通过 tryAcquire(int arg) 方法尝试获取锁,这个方法需要实现类进行实现。返回值小于0则表示获取失败,执行doAcquireShared(int arg)
方法加入队列中。
2. doAcquireShared(int arg)
方法首先调用addWaiter(Node.SHARED)方法将当前线程封装成一个 Node 节点对象,并加入队列尾部。然后判断判断此节点的前置节点是否为head。如果是head节点,则调用tryAcquireShared(arg)
方法再次尝试获取锁,如果获取成功,则会调用 setHeadAndPropagate 方法同时唤醒后继节点,从而实现共享模式。如果获取锁失败,则通过LockSupport.park(this)
方法阻塞当前线程,从而实现线程等待。
setHeadAndPropagate 方法会将当前节点设置为新的头节点,再调用doReleaseShared方法唤醒后继节点,这是共享锁与独占锁最大的区别。独占锁获取锁之后就结束了,而共享锁则则会唤醒后继节点,后继节点继续尝试获取锁。而独占锁的释放也只会唤醒后继节点,而共享锁的释放则会遍历整个Node队列,然后通过
LockSupport.park(node.thread)
唤醒所有等待线程,被唤醒线程再重新开始新的锁竞争。
下面是共享锁释放方法boolean releaseShared(int arg)
的实现如下:
1. 首先调用boolean tryReleaseShared(int arg)
方法,这个方法需要实现类进行实现。
2. 如果boolean tryReleaseShared(int arg)
方法返回true,则执行doReleaseShared()
方法进行队列修改和线程唤醒操作。
3. doReleaseShared()
方法会从head节点开始遍历整个Node队列,通过unparkSuccessor(node)
方法依次唤醒队列中的等待线程。
更多关于AQS的实现原理,请参考:http://objcoding.com/2019/05/05/aqs-exclusive-lock/