一、基本概念
- 全称是
AbstractQueuedSynchronizer
- 是阻塞式锁和相关的同步器工具的框架
★ 特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState:获取 state 状态
- setState:设置 state 状态
- compareAndSetState:cas 机制设置 state 状态(防止多个线程同时设置 state 状态)
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
★ 子类主要实现下面的方法(默认抛出 UnsupportedOperationException)
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
// 如果获取锁失败 if (!tryAcquire(arg)) { // 入队, 可以选择阻塞当前线程 park unpark }
// 如果释放锁成功 if (tryRelease(arg)) { // 让阻塞线程恢复运行(唤醒阻塞的线程) }
二、自定义锁
@Slf4j(topic = "Test1AQS") public class Test1AQS { private static final MyLock MY_LOCK = new MyLock(); public static void main(String[] args) { t4(); } private static void t3() { new Thread(() -> { boolean isAcquire = false; try { isAcquire = MY_LOCK.tryLock(2000, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } try { if (isAcquire) { log.debug("t3() t1 1"); Thread.sleep(2000); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (isAcquire) { log.debug("t3() t1 2"); MY_LOCK.unlock(); } } }, "t1").start(); new Thread(() -> { boolean isAcquire = false; try { isAcquire = MY_LOCK.tryLock(2000, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } try { if (isAcquire) { log.debug("t3() t2 1"); } } finally { if (isAcquire) { log.debug("t3() t2 2"); MY_LOCK.unlock(); } } }, "t2").start(); } private static void t2() { new Thread(() -> { boolean isAcquire = MY_LOCK.tryLock(); try { if (isAcquire) { log.debug("t2() t1 1"); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (isAcquire) { log.debug("t2() t1 2"); MY_LOCK.unlock(); } } }, "t1").start(); new Thread(() -> { boolean isAcquire = MY_LOCK.tryLock(); try { if (isAcquire) { log.debug("t2() t2 1"); } } finally { if (isAcquire) { log.debug("t2() t2 2"); MY_LOCK.unlock(); } } }, "t2").start(); } private static void t1() { new Thread(() -> { MY_LOCK.lock(); try { log.debug("t1 1"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("t1 2"); MY_LOCK.unlock(); } }, "t1").start(); new Thread(() -> { MY_LOCK.lock(); try { log.debug("t2 1"); } finally { log.debug("t2 2"); MY_LOCK.unlock(); } }, "t2").start(); } private static void t4() { new Thread(() -> { log.debug("lock t4()"); // 不可重入 MY_LOCK.lock(); MY_LOCK.lock(); try { log.debug("t4() t1 1"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("t4() t1 2"); MY_LOCK.unlock(); } }, "t1").start(); } } class MyLock implements Lock { private MySync sync = new MySync(); private static class MySync extends AbstractQueuedSynchronizer { /** * 尝试获取锁 */ @Override protected boolean tryAcquire(int arg) { // 获取成功 if (compareAndSetState(0, 1)) { // 设置互斥锁的拥有线程为当前线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 尝试释放锁 */ @Override protected boolean tryRelease(int arg) { // 设置互斥锁的拥有线程为 null setExclusiveOwnerThread(Thread.currentThread()); // setState(0)放在setExclusiveOwnerThread()下面 // 可防止指令重排序(写屏障) setState(0); return true; } /** * 是否持有互斥锁 */ @Override protected boolean isHeldExclusively() { return getState() == 1; } /** * 获取条件变量 */ protected Condition newCondition() { return new ConditionObject(); } } /** * 获取锁 * 若获取失败(进入等待队列) * 不可打断 */ @Override public void lock() { sync.acquire(1); } /** * 获取锁 * 若获取失败(进入等待队列) * 可打断 */ @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 尝试获取锁(只尝试一次) * 若没有获得锁成功,直接返回(不进入等待队列) */ @Override public boolean tryLock() { return sync.tryAcquire(1); } /** * 尝试获取锁 * 若没有获得锁成功,进入等待队列 * 有等待时间限制 */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } /** * 释放锁 */ @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }