图解AQS源码分析(下)

简介: AbstractQueuedSynchronizer抽象的队列式同步器(抽象类)。提供了一个FIFO队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等。

5 源码分析(以非公平锁为例)


5.1 线程A先抢占锁


构造方法:


public ReentrantLock() {
    sync = new NonfairSync();
}


lock():


public void lock() {
    sync.lock();
}


假设现在有两个线程A和B,如下:


static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    // 线程A进来
    // 线程B进来
    final void lock() {
        // 线程A执行成功,将AQS.state置为1,表示已抢占该锁
        // 线程B,由于线程A已将AQS.state置为1,所以线程B执行CAS操作为false
        if (compareAndSetState(0, 1))
            // 线程A,将AbstractOwnableSynchronizer.exclusiveOwnerThread设置为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 线程B执行该部分,尝试获取一个锁
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


其中acquire(1)方法的源码如下:


// 线程B调用:arg = 1
public final void acquire(int arg) {
    // 线程B
    // 尝试获得非公平锁,由于已被线程A抢到,所以tryAcquire(arg) = false
    // addWaiter方法,构建承载线程B的Node,然后添加到队列末尾,返回该node
    // acquireQueued方法
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {
        // 设置当前线程的中断标识
        selfInterrupt();
    }
}


尝试抢锁的方法 tryAcquire(arg) 最终调用的:


/**
 * 尝试抢锁
 *
 * 处理内容:
 *  1 如果抢到锁,返回true
 *      1.1 如果当前线程第一次抢到锁:
 *          AQS.status由0变为1
 *          AQS.exclusiveOwnerThread = Thread.currentThread()
 *          返回true
 *      1.2 如果当前线程再次抢到锁(重入)
 *          AQS.status++
 *          返回true
 *  2 没抢到锁,返回false
 *
 */
// 线程B acquires = 1
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 由于线程A已将state设为1,所以c=1
    int c = getState();
    if (c == 0) {   // 线程B,不满足不执行
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {    // 线程B,不满足不执行
        /**
         * 获得当前独享线程,如果就是当前线程,那么执行重入操作
         * 执行tryLock()时:
         *      如果第二次进入,则nextc = 0 + 1 = 1
         *      如果第三次进入,则nextc = 1 + 1 = 2
         *      如果第四次进入,则nextc = 2 + 1 = 3
         */
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 线程B,返回false
    return false;
}


回到 acquire 代码中 addWaiter 方法:


// 线程B:mode = Node.EXCLUSIVE = null
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail; 
    // 线程B:由于线程A并没有进入链表,所以tail = null不会进入if方法
    if (pred != null) {
        node.prev = pred;   // (老的尾部node) <--- 新node.prev 
        if (compareAndSetTail(pred, node)) {
            pred.next = node;   // (老的尾部node) next ---> (新node)
            return node;
        }
    }
    // 线程B:node = 承载线程B的node
    enq(node);
    return node;
}


addWaiter 方法使用到了 Node 的构造方法:


// 线程B:thread = Thread.currentThread()  mode = null
Node(Thread thread, Node mode) {     
    this.nextWaiter = mode;
    this.thread = thread;
}


对照下面的图来看,nextWaiter 是null,Node的构造方法里没有对 waitStatus 赋值,所以默认为0


20200916111016772.png


回到addWaiter方法,其中还用到了enq方法:


/**
 * 将node节点插入队列末尾
 * 1 如果是空队列,则初始化一个空内容node作为第一个节点,然后将入参node加到队列末尾
 * 2 如果不是空队列,则直接入参node加到队列末尾
 *
 * @param node
 * @return
 */
// 线程B:node = 承载线程B的node
private Node enq(final Node node) {
    for (;;) {
        // 线程B:第一次循环 tail = null
        // 线程B:第二次循环 tail = head = 空内容node
        Node t = tail;
        /**
         * 第一次进入由于队列为null,所以t = null
         * 第二次进入,由于队列已经被初始化1个节点,故 t != null
         */
        if (t == null) {    // 线程B:第一次循环 满足t == null,进入该模块内
            if (compareAndSetHead(new Node()))  // 初始化一个空内容节点,作为AQS.head节点
                tail = head;
        } else {    // 线程B:第二次循环 满足 t != null进入该模块内,即空内容node <-> 新节点
            node.prev = t;  // 老的尾部node <--- 入参node.prev
            if (compareAndSetTail(t, node)) {   // 将AQS.tailOffset内容更新为入参node
 t.next = node;  // 老的尾部node ---> 入参node
                return t;
            }
        }
    }
}


此时addWaiter执行完毕,又回到acquire方法,addWaiter返回了承载B的node,现在要执行acquireQueued方法:


20200916111120168.png


// 线程B:node = 承载B的节点,arg = 1
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 线程B:会一直循环,直到拿到锁
        for (;;) {
            // 线程B:p=空内容node(head)
            final Node p = node.predecessor();
            // 线程B:p == head等于true;但是由于线程A先抢到锁,所以tryAcquire(arg)=false,所以不会进入下面的if
            if (p == head && tryAcquire(arg)) { // tryAcquire(arg):抢锁操作
                setHead(node);  // 更新头结点为入参node
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 线程B:shouldParkAfterFailedAcquire方法,设置p节点的waitStatus = Node.SIGNAL(原先是0),如果自旋两次没抢到锁的话会返回true挂起线程
            // 线程B:parkAndCheckInterrupt方法,挂起线程B
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


5.2 线程A释放锁


如果A调用unlock操作:


public void unlock() {
    sync.release(1);
}


释放锁的方法如下:


// 线程A:arg = 1
public final boolean release(int arg) {
    if (tryRelease(arg)) {  // 线程A:AQS.state = 0,AQS.exclusiveOwnerThread = null
        Node h = head;
        // 线程A:满足条件 h.waitStatus  == SIGNAL(值为-1)
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}


其中调用了tryRelease方法:


// 线程A:release = 1
protected final boolean tryRelease(int releases) {
    // 线程A:c = 1 - 1 = 0
    int c = getState() - releases;
    // 如果当前线程不是之前抢占的线程,则抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 线程A:c == 0 为 true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}


对应:


20200916111326654.png


tryRelease执行完毕之后又回到了release方法中,运行unparkSuccessor方法


private void unparkSuccessor(Node node) {
    // 线程A:node.waitStatus == SIGNAL(-1)
    int ws = node.waitStatus;
    if (ws < 0) {
        // 线程A:设置 waitStatus == 0
        compareAndSetWaitStatus(node, ws, 0);
    }
    Node s = node.next;
    // 线程A:s == 线程B,s.waitStatus == SIGNAL(-1),所以不满足
    if (s == null || s.waitStatus > 0) {    // node是tail 或者 node的tail节点waitStatus > 0
        s = null;   // 断开node与node.next的连接
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 线程A:s == 线程B,激活线程B
    if (s != null)
        LockSupport.unpark(s.thread);
}


释放完毕之后,在线程B的lock方法的一些列调用中有 parkAndCheckInterrupt() 方法,即线程B被挂起了,A释放之后又回到被挂起那个位置继续执行,所以线程B的 acquireQueued() 方法可以抢到锁


对应:


20200916111411633.png

相关文章
|
9天前
|
机器人 API 调度
基于 DMS Dify+Notebook+Airflow 实现 Agent 的一站式开发
本文提出“DMS Dify + Notebook + Airflow”三位一体架构,解决 Dify 在代码执行与定时调度上的局限。通过 Notebook 扩展 Python 环境,Airflow实现任务调度,构建可扩展、可运维的企业级智能 Agent 系统,提升大模型应用的工程化能力。
|
人工智能 前端开发 API
前端接入通义千问(Qwen)API:5 分钟实现你的 AI 问答助手
本文介绍如何在5分钟内通过前端接入通义千问(Qwen)API,快速打造一个AI问答助手。涵盖API配置、界面设计、流式响应、历史管理、错误重试等核心功能,并提供安全与性能优化建议,助你轻松集成智能对话能力到前端应用中。
715 154
|
15天前
|
人工智能 数据可视化 Java
Spring AI Alibaba、Dify、LangGraph 与 LangChain 综合对比分析报告
本报告对比Spring AI Alibaba、Dify、LangGraph与LangChain四大AI开发框架,涵盖架构、性能、生态及适用场景。数据截至2025年10月,基于公开资料分析,实际发展可能随技术演进调整。
967 152
|
负载均衡 Java 微服务
OpenFeign:让微服务调用像本地方法一样简单
OpenFeign是Spring Cloud中声明式微服务调用组件,通过接口注解简化远程调用,支持负载均衡、服务发现、熔断降级、自定义拦截器与编解码,提升微服务间通信开发效率与系统稳定性。
366 156
|
7天前
|
分布式计算 监控 API
DMS Airflow:企业级数据工作流编排平台的专业实践
DMS Airflow 是基于 Apache Airflow 构建的企业级数据工作流编排平台,通过深度集成阿里云 DMS(Data Management Service)系统的各项能力,为数据团队提供了强大的工作流调度、监控和管理能力。本文将从 Airflow 的高级编排能力、DMS 集成的特殊能力,以及 DMS Airflow 的使用示例三个方面,全面介绍 DMS Airflow 的技术架构与实践应用。
|
8天前
|
人工智能 自然语言处理 前端开发
Qoder全栈开发实战指南:开启AI驱动的下一代编程范式
Qoder是阿里巴巴于2025年发布的AI编程平台,首创“智能代理式编程”,支持自然语言驱动的全栈开发。通过仓库级理解、多智能体协同与云端沙箱执行,实现从需求到上线的端到端自动化,大幅提升研发效率,重塑程序员角色,引领AI原生开发新范式。
604 81