深入理解AQS队列同步器原理-从ReentrantLock的非公平独占锁实现来看AQS的原理

简介: 深入理解AQS队列同步器原理-从ReentrantLock的非公平独占锁实现来看AQS的原理

一、AQS的引入

AQS全称AbstractQueuedSynchronize即队列同步器,是JUC下的一个框架。

本教程从ReentrantLock的非公平独占锁来看AQS的原理。本文源码采用JDK17

1.1前置知识

创建队列的三种方式:继承Thread类;实现Runnable接口;实现Callable接口。

LockSupport的使用:因为AQS的线程阻塞和唤醒依赖这个类。LockSupport。park()/LockSupport.unpark()。

LockSupport是一个工具类,提供了基本得线程阻塞和唤醒功能,他是创建和其他同步组件得基础工具,内部使用Unsafe实现,LockSupport和使用他得线程会关联一个许可,park表示消耗一个许可,如果许可存在,park方法返回,没有许可就一直阻塞到许可可用,unpark会增加一个许可,多次调用不会积累许可,因为许可最大值为1.

ReentrantLock的简单使用:obj.lock()/obj.unlock()。

设计模式之模板方法:模板类是抽象方法,里面存在大量通用方法,差异方法采用抽象方法实现,给具体得子类实现。

1.2 Lock接口

ReentrantLock.lock()方法实现了Lock接口

Lock接口存在多个方法,lockInterruptibly响应中断得锁,针对Synchronized实现,因为Synchronized不可以响应中断;tryLock非阻塞获取锁

1.3 ReentrantLock类

Lock接口有多个实现类,我们看ReentrantLock得lock实现。

先给出结构图,可以看见有三个子类,lock()通过Sync实现,Sync有NonfairSync和FairSync两个实现类,也就是公平的和非公平的。

公平锁:按照队列顺序去获取锁

非公平锁:多个线程尝试获取,获取不到进入等待队列

sync的实现默认是非公平的

syn的lock调用了一个抽象方法initialTryLock,initialTryLock是NonfairSync的方法,

NonfairSync继承了Syn实现initialTryLock

final boolean initialTryLock() {
// 获取当前线程
            Thread current = Thread.currentThread();
            // AQS的方法
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
            
                setExclusiveOwnerThread(current);
                return true;
            } else if (getExclusiveOwnerThread() == current) {
            // AQS的方法
                int c = getState() + 1;
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    // AQS的方法
                setState(c);
                return true;
            } else
                return false;
        }

至此AQS引入成功

总结:ReentrantLock的lock()方法内部实现了三个内部类,sync继承AQS,定义抽象的lock实现,再由两个子类去具体实现,也是通过AQS实现

二、AQS的核心原理

同样先给出类的结构图,方法太多没有全部截屏

2.1 state

使用volatile修饰,解决多线程的可见性问题,state在不同子类中含义不同,ReentrantLock中0表示未加锁,1表示加锁,大于1表示锁重入,state有三个方法修改,分别是get/set/cas,都是final修饰

2.2 node节点

前后指针,线程变量,status标识

获取不到锁的线程(阻塞的线程)会被打包到Node节点里面,组成双向链表也就是队列!cas获取锁失败加入链表,释放锁的时候从链表删除

三、AQS类定义与Node数据结构详解

AQS继承自AbstractOwnableSynchronizer,里面包含一个当前获取锁的线程和get/set方法

3.1 Node节点详细定义

两个重要指针,双向链表的头尾指针

两个静态类,也就是AQS支持的两种模式,共享和排他

还存在一个conditionNode类继承了Node,nextWaiter指向下一个指针,JDK9和JDK17在这里实现不同,JDK9是Node没有子类,采用nextWaiter标记共享排他,并且存在三个构造函数,这里采用三个子类实现

四、Acqurie代码

AQS默认的非公平是如何实现的,tryacqurie和acqurie都是AQS的方法

final void lock() {
      // 加锁失败设置执行acquire
      // jdk17在此处优化,旧版本直接set,可重入在其他地方实现
      // 这里获得锁就会执行lock()后面的函数
            if (!initialTryLock())
                acquire(1);
        }
final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            // 尝试获取锁,获取到了用父类方法将当前线程设置为获得锁的线程
            // 0无锁 1加锁
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
                setExclusiveOwnerThread(current);
                return true;
                // 如果就是当前线程获得锁了,那么就state+1,说明AQS是可重入锁
            } else if (getExclusiveOwnerThread() == current) {
                int c = getState() + 1;
                // 小于0,抛出异常,重入次数超过int最大值会抛出
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }
public final void acquire(int arg) {
    // 这里直接抛出异常 取非执行acqurie
        if (!tryAcquire(arg))
        // 参数:node节点 是否共享 是否中断 两个时间相关的
            acquire(null, arg, false, false, false, 0L);
    }
// AQS除了标识加锁无锁之外还应该提供其他状态 这里留给子类实现,
// ReentrantLock没有实现而已,不用抽象类是因为不是所有的子类都要实现
// 见后文思考
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

问题思考

AQS是抽象类为什么子类应该实现的方法没有一个是抽象方法:

AQS是为其他同步组件提供一个强大的基础,不希望别人直接拿来使用,也就是不希望直接new,而是继承实现,所以定义为抽象类。有的方法子类不需要使用,如果用抽象方法实现那么子类必须实现,否则子类又是一个抽象方法,增大了代码的冗余,所以定义为保护方法,可以按需实现。

acquire实现比较复杂,JDK17将各种同步做到了一起,没有很好的封装,并且放弃使用CAS,但是真的晦涩,但是主要还是打包线程放入双向链表,只作简单注释

final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        // 获取当前线程
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued
        for (;;) {
        // 如果是头节点返回head,否则返回null
        // 如果不是头节点,node.pred是当前节点的前驱节点
        // 也就是JDK源码敢这么写吧,可读性真的...
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                // 重入过多,小于0,清楚队列,cleanQueeu纯纯算法
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                // 前前节点是空,onSpinWait是Thread的空方法
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
            // 这里才是理想情况应该进来的
                boolean acquired;
                try {
                    if (shared)
                    // 共享锁执行,tryAcquireShared同样是子类实现
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                    // 非共享锁执行
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                // 取消尝试获取锁
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {
            // 这里做了共享锁和排他锁的不同实现,只是一个标识而已
            // 他们都是继承了Node,没有添加任何东西
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                // Node提供的一系列原子操作
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                // 这里用到了前置知识,给了一个许可
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        // 取消尝试获取锁
        return cancelAcquire(node, interrupted, interruptible);
    }

五、总结与写在最后

AQS是Java提供的一个线程安全的同步器,用于管理可重入的线程访问共享资源的顺序。支持不同的同步状态:可中断等待、非阻塞等待、无锁等待。具有可扩展性和灵活性,而且能够保证并发访问时的线程安全性。它适用于许多高并发、共享资源的场景,例如计数器、消息队列等。在使用 AQS 队列同步器时,开发者需要在对共享资源的操作上使用相应的同步器来保证线程安全。AQS由state标识,双向链表和CAS组成,调用了LockSupport的park和unpark功能,ReentrantLock是典型实现。

JDK17的源码真的看不懂,建议读者阅读JDK8/9版本

码文不易,点个赞吧😍

相关文章
|
7月前
|
存储 监控
多线程之AQS独占锁
多线程之AQS独占锁
59 0
|
安全 Java
【深入理解同步器AQS】
【深入理解同步器AQS】
126 0
|
4月前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
95 0
|
5月前
|
存储 设计模式 安全
(五)深入剖析并发之AQS独占锁&重入锁(ReetrantLock)及Condition实现原理
在我们前面的文章《[深入理解Java并发编程之无锁CAS机制》中我们曾提到的CAS机制如果说是整个Java并发编程基础的话,那么本章跟大家所讲述的AQS则是整个Java JUC的核心。不过在学习AQS之前需要对于CAS机制有一定的知识储备,因为CAS在ReetrantLock及AQS中的实现随处可见。
|
存储 安全 Java
aqs原理初探以及公平锁和非公平锁实现
aqs原理初探以及公平锁和非公平锁实现
311 0
|
安全 Java
并发编程-19AQS同步组件之重入锁ReentrantLock、 读写锁ReentrantReadWriteLock、Condition
并发编程-19AQS同步组件之重入锁ReentrantLock、 读写锁ReentrantReadWriteLock、Condition
97 0
并发编程-19AQS同步组件之重入锁ReentrantLock、 读写锁ReentrantReadWriteLock、Condition
|
算法 安全 Java
深入浅出理解Java并发AQS的独占锁模式
深入浅出理解Java并发AQS的独占锁模式
142 0
深入浅出理解Java并发AQS的独占锁模式
AQS的应用:基于AQS实现自定义同步器
AQS的应用:基于AQS实现自定义同步器
212 0
AQS——条件变量的支持
AQS——条件变量的支持
113 0
下一篇
DataWorks