概述
全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点: 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
- tryAcquire(int arg):独占获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态。
- tryRelease(int arg) 独占式释放同步状态
- tryAcquireShared(int arg) 共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败。
- tryReleaseShared(int arg) 共享式释放同步状态。
- isHeldExclusively:当前同步器是否在独占模式下被线程占用,一般该方法表示同步器是否被当前线程独占
获取锁的姿势
1. // 如果获取锁失败 2. if (!tryAcquire(arg)) { 3. // 入队, 可以选择阻塞当前线程 park unpark 4. }
释放锁的姿势
1. // 如果释放锁成功 2. if (tryRelease(arg)) { 3. // 让阻塞线程恢复运行 4. }
手写不可重入锁
自定义同步器
1. final class MySync extends AbstractQueuedSynchronizer { 2. @Override 3. protected boolean tryAcquire(int acquires) { 4. if (acquires == 1) { 5. if (compareAndSetState(0, 1)) { 6. setExclusiveOwnerThread(Thread.currentThread()); 7. return true; 8. } 9. } 10. return false; 11. } 12. 13. @Override 14. protected boolean tryRelease(int acquires) { 15. if (acquires == 1) { 16. if (getState() == 0) { 17. throw new IllegalMonitorStateException(); 18. } 19. setExclusiveOwnerThread(null); 20. setState(0); 21. return true; 22. } 23. return false; 24. } 25. 26. protected Condition newCondition() { 27. return new ConditionObject(); 28. } 29. 30. @Override 31. protected boolean isHeldExclusively() { 32. return getState() == 1; 33. } 34. }
自定义锁
1. class MyLock implements Lock { 2. static MySync sync = new MySync(); 3. @Override 4. // 尝试,不成功,进入等待队列 5. public void lock() { 6. sync.acquire(1); 7. } 8. @Override 9. // 尝试,不成功,进入等待队列,可打断 10. public void lockInterruptibly() throws InterruptedException { 11. sync.acquireInterruptibly(1); 12. } 13. @Override 14. // 尝试一次,不成功返回,不进入队列 15. public boolean tryLock() { 16. return sync.tryAcquire(1); 17. } 18. @Override 19. // 尝试,不成功,进入等待队列,有时限 20. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 21. return sync.tryAcquireNanos(1, unit.toNanos(time)); 22. } 23. @Override 24. // 释放锁 25. public void unlock() { 26. sync.release(1); 27. } 28. @Override 29. // 生成条件变量 30. public Condition newCondition() { 31. return sync.newCondition(); 32. } 33. }
测试
1. MyLock lock = new MyLock(); 2. new Thread(() -> { 3. lock.lock(); 4. try { 5. log.debug("locking..."); 6. sleep(1); 7. } finally { 8. log.debug("unlocking..."); 9. lock.unlock(); 10. } 11. },"t1").start(); 12. new Thread(() -> { 13. lock.lock(); 14. try { 15. log.debug("locking..."); 16. } finally { 17. log.debug("unlocking..."); 18. lock.unlock(); 19. } 20. },"t2").start();
输出
22:29:28.727 c.TestAqs [t1] - locking...
22:29:29.732 c.TestAqs [t1] - unlocking...
22:29:29.732 c.TestAqs [t2] - locking...
22:29:29.732 c.TestAqs [t2] - unlocking...
不可重入测试
如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)
1. lock.lock(); 2. log.debug("locking..."); 3. lock.lock(); 4. log.debug("locking...");
具体
起源
早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。
目标
AQS 要实现的功能目标
- 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
- 获取锁超时机制
- 通过打断取消机制
- 独占机制及共享机制
- 条件不满足时的等待机制
要实现的性能目标
Instead, the primary performance goal here is scalability: to predictably maintain efficiency even, orespecially, when synchronizers are contended.
设计
AQS 的基本思想其实很简单
获取锁的逻辑
1. while(state 状态不允许获取) { 2. if(队列中还没有此线程) { 3. 入队并阻塞 4. } 5. } 6. 当前线程出队
释放锁的逻辑
1. if(state 状态允许了) { 2. 恢复阻塞的线程(s) 3. }
要点:
原子维护 state 状态
阻塞及恢复线程
维护队列
state 设计
- state 使用 volatile 配合 cas 保证其修改时的原子性
- state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
阻塞恢复设计
- 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume那么 suspend 将感知不到
- 解决方法是使用 park & unpark 来实现线程的暂停和恢复,先 unpark 再 park 也没问题
- park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
- park 线程还可以通过 interrupt 打断
队列设计
- 使用了 FIFO 先入先出队列,并不支持优先级队列
- 设计时借鉴了 CLH 队列,它是一种单向无锁队列
队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态 入队伪代码,只需要考虑 tail 赋值的原子性,
1. do { 2. // 原来的 tail 3. Node prev = tail; 4. // 用 cas 在原来 tail 的基础上改为 node 5. } while(tail.compareAndSet(prev, node))
出队伪代码
1. // prev 是上一个节点 2. while((Node prev=node.prev).state != 唤醒状态) { 3. } 4. // 设置头节点 5. head = node;
为什么 AQS 需要一个虚拟 head 节点
每个节点都必须设置前置节点的 ws 状态为 SIGNAL(-1),因为每个节点在休眠前,都需要将前置节点的 ws 设置成 SIGNAL。否则自己永远无法被唤醒,所以必须要一个前置节点,而这个前置节点,实际上就是当前持有锁的节点。
由于第一个节点他是没有前置节点的,就创建一个假的。
总结下来就是:每个节点都需要设置前置节点的 ws 状态(这个状态为是为了保证数据一致性),而第一个节点是没有前置节点的,所以需要创建一个虚拟节点。