一、AQS的引入
AQS全称AbstractQueuedSynchronize
即队列同步器,是JUC下的一个框架。
本教程从ReentrantLock
的非公平独占锁来看AQS的原理。本文源码采用JDK17
1.1前置知识
创建队列的三种方式:继承Thread类;实现Runnable接口;实现Callable接口。
LockSupport的使用:因为AQS的线程阻塞和唤醒依赖这个类。LockSupport。park()/LockSupport.unpark()。
LockSupport是一个工具类,提供了基本得线程阻塞和唤醒功能,他是创建和其他同步组件得基础工具,内部使用Unsafe实现,LockSupport和使用他得线程会关联一个许可,park表示消耗一个许可,如果许可存在,park方法返回,没有许可就一直阻塞到许可可用,unpark会增加一个许可,多次调用不会积累许可,因为许可最大值为1.
ReentrantLock的简单使用:obj.lock()/obj.unlock()。
设计模式之模板方法:模板类是抽象方法,里面存在大量通用方法,差异方法采用抽象方法实现,给具体得子类实现。
1.2 Lock接口
ReentrantLock.lock()方法实现了Lock接口
Lock接口存在多个方法,lockInterruptibly响应中断得锁,针对Synchronized实现,因为Synchronized不可以响应中断;tryLock非阻塞获取锁
1.3 ReentrantLock类
Lock接口有多个实现类,我们看ReentrantLock得lock实现。
先给出结构图,可以看见有三个子类,lock()通过Sync实现,Sync有NonfairSync和FairSync两个实现类,也就是公平的和非公平的。
公平锁:按照队列顺序去获取锁
非公平锁:多个线程尝试获取,获取不到进入等待队列
sync的实现默认是非公平的
syn的lock调用了一个抽象方法initialTryLock,initialTryLock是NonfairSync的方法,
NonfairSync继承了Syn实现initialTryLock
final boolean initialTryLock() { // 获取当前线程 Thread current = Thread.currentThread(); // AQS的方法 if (compareAndSetState(0, 1)) { // first attempt is unguarded setExclusiveOwnerThread(current); return true; } else if (getExclusiveOwnerThread() == current) { // AQS的方法 int c = getState() + 1; if (c < 0) // overflow throw new Error("Maximum lock count exceeded"); // AQS的方法 setState(c); return true; } else return false; }
至此AQS引入成功
总结:ReentrantLock的lock()方法内部实现了三个内部类,sync继承AQS,定义抽象的lock实现,再由两个子类去具体实现,也是通过AQS实现
二、AQS的核心原理
同样先给出类的结构图,方法太多没有全部截屏
2.1 state
使用volatile修饰,解决多线程的可见性问题,state在不同子类中含义不同,ReentrantLock中0表示未加锁,1表示加锁,大于1表示锁重入
,state有三个方法修改,分别是get/set/cas,都是final修饰
2.2 node节点
前后指针,线程变量,status标识
获取不到锁的线程(阻塞的线程)会被打包到Node节点里面,组成双向链表也就是队列!cas获取锁失败加入链表,释放锁的时候从链表删除
三、AQS类定义与Node数据结构详解
AQS继承自AbstractOwnableSynchronizer,里面包含一个当前获取锁的线程和get/set方法
3.1 Node节点详细定义
两个重要指针,双向链表的头尾指针
两个静态类,也就是AQS支持的两种模式,共享和排他
还存在一个conditionNode类继承了Node,nextWaiter指向下一个指针,JDK9和JDK17在这里实现不同,JDK9是Node没有子类,采用nextWaiter标记共享排他,并且存在三个构造函数,这里采用三个子类实现
四、Acqurie代码
AQS默认的非公平是如何实现的,tryacqurie和acqurie都是AQS的方法
final void lock() { // 加锁失败设置执行acquire // jdk17在此处优化,旧版本直接set,可重入在其他地方实现 // 这里获得锁就会执行lock()后面的函数 if (!initialTryLock()) acquire(1); } final boolean initialTryLock() { Thread current = Thread.currentThread(); // 尝试获取锁,获取到了用父类方法将当前线程设置为获得锁的线程 // 0无锁 1加锁 if (compareAndSetState(0, 1)) { // first attempt is unguarded setExclusiveOwnerThread(current); return true; // 如果就是当前线程获得锁了,那么就state+1,说明AQS是可重入锁 } else if (getExclusiveOwnerThread() == current) { int c = getState() + 1; // 小于0,抛出异常,重入次数超过int最大值会抛出 if (c < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(c); return true; } else return false; } public final void acquire(int arg) { // 这里直接抛出异常 取非执行acqurie if (!tryAcquire(arg)) // 参数:node节点 是否共享 是否中断 两个时间相关的 acquire(null, arg, false, false, false, 0L); } // AQS除了标识加锁无锁之外还应该提供其他状态 这里留给子类实现, // ReentrantLock没有实现而已,不用抽象类是因为不是所有的子类都要实现 // 见后文思考 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
问题思考
AQS是抽象类为什么子类应该实现的方法没有一个是抽象方法:
AQS是为其他同步组件提供一个强大的基础,不希望别人直接拿来使用,也就是不希望直接new,而是继承实现,所以定义为抽象类。有的方法子类不需要使用,如果用抽象方法实现那么子类必须实现,否则子类又是一个抽象方法,增大了代码的冗余,所以定义为保护方法,可以按需实现。
acquire实现比较复杂,JDK17将各种同步做到了一起,没有很好的封装,并且放弃使用CAS,但是真的晦涩,但是主要还是打包线程放入双向链表,只作简单注释
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { // 获取当前线程 Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false; Node pred = null; // predecessor of node when enqueued for (;;) { // 如果是头节点返回head,否则返回null // 如果不是头节点,node.pred是当前节点的前驱节点 // 也就是JDK源码敢这么写吧,可读性真的... if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { // 重入过多,小于0,清楚队列,cleanQueeu纯纯算法 if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { // 前前节点是空,onSpinWait是Thread的空方法 Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { // 这里才是理想情况应该进来的 boolean acquired; try { if (shared) // 共享锁执行,tryAcquireShared同样是子类实现 acquired = (tryAcquireShared(arg) >= 0); else // 非共享锁执行 acquired = tryAcquire(arg); } catch (Throwable ex) { // 取消尝试获取锁 cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // 这里做了共享锁和排他锁的不同实现,只是一个标识而已 // 他们都是继承了Node,没有添加任何东西 if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) // Node提供的一系列原子操作 node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) // 这里用到了前置知识,给了一个许可 LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } // 取消尝试获取锁 return cancelAcquire(node, interrupted, interruptible); }
五、总结与写在最后
AQS是Java提供的一个线程安全的同步器,用于管理可重入的线程访问共享资源的顺序。支持不同的同步状态:可中断等待、非阻塞等待、无锁等待。具有可扩展性和灵活性,而且能够保证并发访问时的线程安全性。它适用于许多高并发、共享资源的场景,例如计数器、消息队列等。在使用 AQS 队列同步器时,开发者需要在对共享资源的操作上使用相应的同步器来保证线程安全。AQS由state标识,双向链表和CAS组成,调用了LockSupport的park和unpark功能,ReentrantLock是典型实现。
JDK17的源码真的看不懂,建议读者阅读JDK8/9版本
码文不易,点个赞吧😍