AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作

简介: AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作

AtomicInteger 底层实现原理是什么?如何在自己代码中应用 CAS 操作?


AtomicInteger 是对 int 类型的一个封装,提供原子性的访问和更新操作,其原子性的操作实现是基于 CAS (compare-and-swap)技术。

CAS,表征的是一些列操作的集合,获取当前数值,进行一些运算,利用 CAS 指令试图进行更新,如果当前数值不变,代码没有其他线程进行并发修改,则成功更新。否则,可能出现不同的选择,要么进行重试,要么就反应一个成功或者失败的结果。


ActomicInteger 实现原理


ActomicInteger 的内部属性可以看到,它是依赖 Unsafe 的一些底层能力,进行底层操作,以 volatile 的 value 字段,记录数值,以保证可见性。


private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;

Unsafe 会利用 value 字段的内存地址便宜,直接完成操作。


public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }

getAndIncrement 是需要明确返回值的,因此 getAndAddInt 实现是需要失败重试,最后拿到返回值的。


public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

CompareAndset 这样的直接返回 Boolean 值,不需要失败重试。


CAS  底层如何实现


CAS 底层实现,依赖 CPU 特定指令, 具体根据体系的不同还存在明显的区别。例如,x86 CPU  提供 cmpxchg 指令。而在精简指令集的体系架构中,则通常是靠一对儿指令(如" load and reserve"和" store conditional")实现的,在大多数处理器上CAS都是个非常轻量级的操作,这也是其优势所在。


CAS 使用场景


可以设想这样一个场景:在数据库产品中,为保证索引的一致性,一个常见的选择是,保证只有一个线程能够排他性地修改一个索引分区,如何在数据库抽象层实现?

可以考虑为索引分区对象添加一个逻辑上的锁,例如,以当前独占的线程ID作为锁的数值,然后通过原子操作设置lock数值,来实现加锁和释放锁,伪代码如下:


public class AtomicBTreePartition {
    private volatile long lock;
    public void acquireLock();
    public void releaseeLock();
}

使用 AtomicLongFieldUpdater 替代 Unsafe


那么在Java代码中,我们怎么实现锁操作呢?Unsafe 似乎不是个好的选择,例如,我就注意到类似 Cassandra等产品,因为Java9中移除了 Unsafe.moniter Enter()/moniterEXit(),导致无法平滑升级到新的JDK版本。目前Java提供了两种公共API,可以实现这种CAS操作,比如使用 java.util.concurrent.atomic.AtomicLongFieldUpdater,它是基于反射机制创建,我们需要保证类型和字段名称正确。


private static final AtomicLongFieldupdater<AtomicBTreePartition> lockFieldUpdater = AtomicLongFieldupdater.newUpdater(AtomicBTreePartition. class,"lock");
private void acquireLock(){
   long t= Thread currentThread().getId();
   while(!lockFieldUpdater.compareAndSet(this, 0L, t)){
      // 等待一会儿数据库架可比较慢
      ...
   }
}

VariableHadnle  替换 Unsafe


如果是Java9以后,我们完全可以釆用另外一种方式实现,也就是 Variable handle api,这是源自于JEp193,提供了各种粒度的原子或者有序性的操作等。


private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle(AtomicBTreePartition. class, "lock");
private void acquireLock(){
   long t= Thread currentThread().getId();
   while(!HANDLE.compareAndSet(this, 0L, t)){
      // 等待一会儿数据库架可比较慢
      ...
   }
}

CAS 副作用


  1. 可能过度消耗 CPU 试想,其常用的失败重试机制,隐含着一个假设,即假设竞争情况是短暂的。大多数应用场景中,确实大部分重试只会发生一次就获得了成功,但是总是有意外情况,所以在有需要的时候,还是要考虑限制自旋的次数,以免过度消耗CPU
  2. ABA 问题 这是通常只在lock-free算法下暴露的问题。我前面说过CAS是在更新时比较前值,如果对方只是恰好相同,例如期间发生了A->B->A的更新,仅仅判断数值是A,可能导致不合理的修改操作。针对这种情况,Java提供了 AtomicStampedReference工具类,通过为引用建立类似版本号(stamp)的方式,来保证CAS的正确性,具体用法请參考这里的介绍


String initialRef   = "initial value referenced";
int    initialStamp = 0;
AtomicStampedReference<String> atomicStringReference =
    new AtomicStampedReference<String>(
        initialRef, initialStamp
    );
String newRef   = "new value referenced";
int    newStamp = initialStamp + 1;
boolean exchanged = atomicStringReference
    .compareAndSet(initialRef, newRef, initialStamp, newStamp);
System.out.println("exchanged: " + exchanged);  //true
exchanged = atomicStringReference
    .compareAndSet(initialRef, "new string", newStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged);  //false
exchanged = atomicStringReference
    .compareAndSet(newRef, "new string", initialStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged);  //false
exchanged = atomicStringReference
    .compareAndSet(newRef, "new string", newStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged);  //true

AbstractQueuedSynchronizer (AQS)


理解为什么需要AQS,如何使用AQS,至少要做什么,再进一步结合JDK源代码中的实践,理解AQS的原理与应用Doug Lea曾经介绍过AQS的设计初衷。比如 Semaphore 就选择了将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用AQS为我们构建冋步结构握供了范本。


640.png



AQS 内部数据和方法


  • 一个 volatile的整数成员表征状态,同时提供了 setstate和 getstate方法
private volatile int state
  • 一个先入先出(FIFO)的等待线程队列,以实现多线程间竞争和等待,这是AQS机制的核心之一
  • 各种基于CAS的基础操作方法,以及各种期望具体同步结构去实现的 acquire/ release方法。

利用AQS实现一个同步结构,至少要实现两个基本类型的方法,分别是 acquire操作,获取资源的独占权;还有就是 release操作,释放对某个资源的独占。


AQS  的应用场景


ReentrantLock 就是基于AQS 实现的


private final Sync sync;
/**
 * Base of synchronization control for this lock. Subclassed
 * into fair and nonfair versions below. Uses AQS state to
 * represent the number of holds on the lock.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    // Methods relayed from outer class
    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }
    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }
    final boolean isLocked() {
        return getState() != 0;
    }
    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}

ReentrantLock 的 acquire ,release操作


public void lock() {
    sync.lock();
}
 public void unlock() {
    sync.release(1);
}

看 sync aquire 内部实现。


public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

排除掉一些细节,整体地分析 acquire方法逻辑,其直接实现是在AQS内部,调用了 tryAcquireacquireQueued,这是两个需要搞淸楚的基本部分。


public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

以非公平的 tryAcquire为例,其内部实现了如何配合状态与CAS获取锁,注意,对比公平版本的 tryAcquire,它在锁无人占有时,并不检查是否有其他等待者,这里体现了非公平的。


非公平版本:


final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

公平版本的实现会检查队列中是否有等待者


protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }


接下来我再来分析 acquireQueued,如果前面的 tryAcquire失败,代表着锁争抢失败,进入排队竞争阶段。这里就是我们所说的,利用FIFO队列,实现线程间对锁的竞争的部分算是AQS的核心逻辑。


acquireQueued 实现原理


当前线程会被包装成为一个排他模式的节点( EXCLUSIVE),通过 addWaiter方法添加到队列中。acquireQueued 的逻辑,简要来说,就是如果当前节点的前面是头节点,则试图获取锁,一切顺利则成为新的头节点;否则,有必要则等待,具体处理逻辑请參考我添加的注释。


final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 循环
                final Node p = node.predecessor();// 获取前一个节点
                if (p == head && tryAcquire(arg)) {// 如果前一个节点是头结点,标识当前节点合适 tryAcquire
                    setHead(node);// acuire 成功,则设置新头节点。
                    p.next = null; // help GC 将前面节点对当前节点的引用清空
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())// 检查是否失败后需要park, 然后循环去入队
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);  // 出现一次,取消
        }
    }
//  入队逻辑
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


到这里线程试图获取锁的过程基本展现出来了,tryAcquire是按照特定场景需要开发者去实现的部分,而线程间竞争则是AQS通过Waiter队列与 acquire在 release方法中,同样会对队列进行对应操作。

相关文章
|
3天前
|
人工智能 自然语言处理 Shell
深度评测 | 仅用3分钟,百炼调用满血版 Deepseek-r1 API,百万Token免费用,简直不要太爽。
仅用3分钟,百炼调用满血版Deepseek-r1 API,享受百万免费Token。阿里云提供零门槛、快速部署的解决方案,支持云控制台和Cloud Shell两种方式,操作简便。Deepseek-r1满血版在推理能力上表现出色,尤其擅长数学、代码和自然语言处理任务,使用过程中无卡顿,体验丝滑。结合Chatbox工具,用户可轻松掌控模型,提升工作效率。阿里云大模型服务平台百炼不仅速度快,还确保数据安全,值得信赖。
129305 24
深度评测 | 仅用3分钟,百炼调用满血版 Deepseek-r1 API,百万Token免费用,简直不要太爽。
|
5天前
|
人工智能 API 网络安全
用DeepSeek,就在阿里云!四种方式助您快速使用 DeepSeek-R1 满血版!更有内部实战指导!
DeepSeek自发布以来,凭借卓越的技术性能和开源策略迅速吸引了全球关注。DeepSeek-R1作为系列中的佼佼者,在多个基准测试中超越现有顶尖模型,展现了强大的推理能力。然而,由于其爆火及受到黑客攻击,官网使用受限,影响用户体验。为解决这一问题,阿里云提供了多种解决方案。
16178 37
|
13天前
|
机器学习/深度学习 人工智能 自然语言处理
PAI Model Gallery 支持云上一键部署 DeepSeek-V3、DeepSeek-R1 系列模型
DeepSeek 系列模型以其卓越性能在全球范围内备受瞩目,多次评测中表现优异,性能接近甚至超越国际顶尖闭源模型(如OpenAI的GPT-4、Claude-3.5-Sonnet等)。企业用户和开发者可使用 PAI 平台一键部署 DeepSeek 系列模型,实现 DeepSeek 系列模型与现有业务的高效融合。
|
4天前
|
并行计算 PyTorch 算法框架/工具
本地部署DeepSeek模型
要在本地部署DeepSeek模型,需准备Linux(推荐Ubuntu 20.04+)或兼容的Windows/macOS环境,配备NVIDIA GPU(建议RTX 3060+)。安装Python 3.8+、PyTorch/TensorFlow等依赖,并通过官方渠道下载模型文件。配置模型后,编写推理脚本进行测试,可选使用FastAPI服务化部署或Docker容器化。注意资源监控和许可协议。
1190 8
|
13天前
|
人工智能 搜索推荐 Docker
手把手教你使用 Ollama 和 LobeChat 快速本地部署 DeepSeek R1 模型,创建个性化 AI 助手
DeepSeek R1 + LobeChat + Ollama:快速本地部署模型,创建个性化 AI 助手
3347 117
手把手教你使用 Ollama 和 LobeChat 快速本地部署 DeepSeek R1 模型,创建个性化 AI 助手
|
8天前
|
人工智能 自然语言处理 API
DeepSeek全尺寸模型上线阿里云百炼!
阿里云百炼平台近日上线了DeepSeek-V3、DeepSeek-R1及其蒸馏版本等六款全尺寸AI模型,参数量达671B,提供高达100万免费tokens。这些模型在数学、代码、自然语言推理等任务上表现出色,支持灵活调用和经济高效的解决方案,助力开发者和企业加速创新与数字化转型。示例代码展示了如何通过API使用DeepSeek-R1模型进行推理,用户可轻松获取思考过程和最终答案。
|
5天前
|
人工智能 自然语言处理 程序员
如何在通义灵码里用上DeepSeek-V3 和 DeepSeek-R1 满血版671B模型?
除了 AI 程序员的重磅上线外,近期通义灵码能力再升级全新上线模型选择功能,目前已经支持 Qwen2.5、DeepSeek-V3 和 R1系列模型,用户可以在 VSCode 和 JetBrains 里搜索并下载最新通义灵码插件,在输入框里选择模型,即可轻松切换模型。
853 14
|
12天前
|
API 开发工具 Python
阿里云PAI部署DeepSeek及调用
本文介绍如何在阿里云PAI EAS上部署DeepSeek模型,涵盖7B模型的部署、SDK和API调用。7B模型只需一张A10显卡,部署时间约10分钟。文章详细展示了模型信息查看、在线调试及通过OpenAI SDK和Python Requests进行调用的步骤,并附有测试结果和参考文档链接。
1875 9
阿里云PAI部署DeepSeek及调用
|
9天前
|
人工智能 数据可视化 Linux
【保姆级教程】3步搞定DeepSeek本地部署
DeepSeek在2025年春节期间突然爆火出圈。在目前DeepSeek的网站中,极不稳定,总是服务器繁忙,这时候本地部署就可以有效规避问题。本文以最浅显易懂的方式带读者一起完成DeepSeek-r1大模型的本地部署。
|
11天前
|
缓存 自然语言处理 安全
快速调用 Deepseek API!【超详细教程】
Deepseek 强大的功能,在本教程中,将指导您如何获取 DeepSeek API 密钥,并演示如何使用该密钥调用 DeepSeek API 以进行调试。

热门文章

最新文章