AtomicInteger 底层实现原理是什么?如何在自己代码中应用 CAS 操作?
AtomicInteger 是对 int 类型的一个封装,提供原子性的访问和更新操作,其原子性的操作实现是基于 CAS (compare-and-swap)技术。
CAS,表征的是一些列操作的集合,获取当前数值,进行一些运算,利用 CAS 指令试图进行更新,如果当前数值不变,代码没有其他线程进行并发修改,则成功更新。否则,可能出现不同的选择,要么进行重试,要么就反应一个成功或者失败的结果。
ActomicInteger 实现原理
ActomicInteger 的内部属性可以看到,它是依赖 Unsafe 的一些底层能力,进行底层操作,以 volatile 的 value 字段,记录数值,以保证可见性。
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
Unsafe 会利用 value 字段的内存地址便宜,直接完成操作。
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }
getAndIncrement 是需要明确返回值的,因此 getAndAddInt 实现是需要失败重试,最后拿到返回值的。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
CompareAndset 这样的直接返回 Boolean 值,不需要失败重试。
CAS 底层如何实现
CAS 底层实现,依赖 CPU 特定指令, 具体根据体系的不同还存在明显的区别。例如,x86 CPU 提供 cmpxchg 指令。而在精简指令集的体系架构中,则通常是靠一对儿指令(如" load and reserve"和" store conditional")实现的,在大多数处理器上CAS都是个非常轻量级的操作,这也是其优势所在。
CAS 使用场景
可以设想这样一个场景:在数据库产品中,为保证索引的一致性,一个常见的选择是,保证只有一个线程能够排他性地修改一个索引分区,如何在数据库抽象层实现?
可以考虑为索引分区对象添加一个逻辑上的锁,例如,以当前独占的线程ID作为锁的数值,然后通过原子操作设置lock数值,来实现加锁和释放锁,伪代码如下:
public class AtomicBTreePartition { private volatile long lock; public void acquireLock(); public void releaseeLock(); }
使用 AtomicLongFieldUpdater 替代 Unsafe
那么在Java代码中,我们怎么实现锁操作呢?Unsafe 似乎不是个好的选择,例如,我就注意到类似 Cassandra等产品,因为Java9中移除了 Unsafe.moniter Enter()/moniterEXit(),导致无法平滑升级到新的JDK版本。目前Java提供了两种公共API,可以实现这种CAS操作,比如使用 java.util.concurrent.atomic.AtomicLongFieldUpdater,它是基于反射机制创建,我们需要保证类型和字段名称正确。
private static final AtomicLongFieldupdater<AtomicBTreePartition> lockFieldUpdater = AtomicLongFieldupdater.newUpdater(AtomicBTreePartition. class,"lock"); private void acquireLock(){ long t= Thread currentThread().getId(); while(!lockFieldUpdater.compareAndSet(this, 0L, t)){ // 等待一会儿数据库架可比较慢 ... } }
VariableHadnle 替换 Unsafe
如果是Java9以后,我们完全可以釆用另外一种方式实现,也就是 Variable handle api,这是源自于JEp193,提供了各种粒度的原子或者有序性的操作等。
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle(AtomicBTreePartition. class, "lock"); private void acquireLock(){ long t= Thread currentThread().getId(); while(!HANDLE.compareAndSet(this, 0L, t)){ // 等待一会儿数据库架可比较慢 ... } }
CAS 副作用
- 可能过度消耗 CPU 试想,其常用的失败重试机制,隐含着一个假设,即假设竞争情况是短暂的。大多数应用场景中,确实大部分重试只会发生一次就获得了成功,但是总是有意外情况,所以在有需要的时候,还是要考虑限制自旋的次数,以免过度消耗CPU
- ABA 问题 这是通常只在lock-free算法下暴露的问题。我前面说过CAS是在更新时比较前值,如果对方只是恰好相同,例如期间发生了A->B->A的更新,仅仅判断数值是A,可能导致不合理的修改操作。针对这种情况,Java提供了 AtomicStampedReference工具类,通过为引用建立类似版本号(stamp)的方式,来保证CAS的正确性,具体用法请參考这里的介绍
- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicStampedReference.html
- http://tutorials.jenkov.com/java-util-concurrent/atomicstampedreference.html
String initialRef = "initial value referenced"; int initialStamp = 0; AtomicStampedReference<String> atomicStringReference = new AtomicStampedReference<String>( initialRef, initialStamp ); String newRef = "new value referenced"; int newStamp = initialStamp + 1; boolean exchanged = atomicStringReference .compareAndSet(initialRef, newRef, initialStamp, newStamp); System.out.println("exchanged: " + exchanged); //true exchanged = atomicStringReference .compareAndSet(initialRef, "new string", newStamp, newStamp + 1); System.out.println("exchanged: " + exchanged); //false exchanged = atomicStringReference .compareAndSet(newRef, "new string", initialStamp, newStamp + 1); System.out.println("exchanged: " + exchanged); //false exchanged = atomicStringReference .compareAndSet(newRef, "new string", newStamp, newStamp + 1); System.out.println("exchanged: " + exchanged); //true
AbstractQueuedSynchronizer (AQS)
理解为什么需要AQS,如何使用AQS,至少要做什么,再进一步结合JDK源代码中的实践,理解AQS的原理与应用Doug Lea曾经介绍过AQS的设计初衷。比如 Semaphore 就选择了将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用AQS为我们构建冋步结构握供了范本。
AQS 内部数据和方法
- 一个 volatile的整数成员表征状态,同时提供了 setstate和 getstate方法
private volatile int state
- 一个先入先出(FIFO)的等待线程队列,以实现多线程间竞争和等待,这是AQS机制的核心之一
- 各种基于CAS的基础操作方法,以及各种期望具体同步结构去实现的 acquire/ release方法。
利用AQS实现一个同步结构,至少要实现两个基本类型的方法,分别是 acquire操作,获取资源的独占权;还有就是 release操作,释放对某个资源的独占。
AQS 的应用场景
ReentrantLock 就是基于AQS 实现的
private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
ReentrantLock 的 acquire ,release操作
public void lock() { sync.lock(); } public void unlock() { sync.release(1); }
看 sync aquire 内部实现。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
排除掉一些细节,整体地分析 acquire方法逻辑,其直接实现是在AQS内部,调用了 tryAcquire 和 acquireQueued,这是两个需要搞淸楚的基本部分。
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
以非公平的 tryAcquire为例,其内部实现了如何配合状态与CAS获取锁,注意,对比公平版本的 tryAcquire,它在锁无人占有时,并不检查是否有其他等待者,这里体现了非公平的。
非公平版本:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平版本的实现会检查队列中是否有等待者
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
接下来我再来分析 acquireQueued,如果前面的 tryAcquire失败,代表着锁争抢失败,进入排队竞争阶段。这里就是我们所说的,利用FIFO队列,实现线程间对锁的竞争的部分算是AQS的核心逻辑。
acquireQueued 实现原理
当前线程会被包装成为一个排他模式的节点( EXCLUSIVE),通过 addWaiter方法添加到队列中。acquireQueued 的逻辑,简要来说,就是如果当前节点的前面是头节点,则试图获取锁,一切顺利则成为新的头节点;否则,有必要则等待,具体处理逻辑请參考我添加的注释。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {// 循环 final Node p = node.predecessor();// 获取前一个节点 if (p == head && tryAcquire(arg)) {// 如果前一个节点是头结点,标识当前节点合适 tryAcquire setHead(node);// acuire 成功,则设置新头节点。 p.next = null; // help GC 将前面节点对当前节点的引用清空 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 检查是否失败后需要park, 然后循环去入队 interrupted = true; } } finally { if (failed) cancelAcquire(node); // 出现一次,取消 } } // 入队逻辑 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
到这里线程试图获取锁的过程基本展现出来了,tryAcquire是按照特定场景需要开发者去实现的部分,而线程间竞争则是AQS通过Waiter队列与 acquire在 release方法中,同样会对队列进行对应操作。