从入门到入魂:Lock 体系与 AQS 队列同步器源码级深度剖析

简介: 本文深度解析Java并发核心——Lock体系与AQS(AbstractQueuedSynchronizer)。涵盖Lock接口契约、与synchronized的本质差异、ReentrantLock/ReadWriteLock/StampedLock实现原理;透彻剖析AQS的state同步状态、CLH队列、Node节点、模板方法及acquire/release源码;辅以生产者消费者、自定义共享锁等实战示例,助你打通并发编程任督二脉。

并发编程的核心痛点是多线程对共享资源的安全访问,锁是解决该问题的核心工具。JDK内置的synchronized关键字通过JVM底层实现了隐式的锁机制,而java.util.concurrent.locks包下的显式Lock体系,凭借更灵活的锁控制、更丰富的功能特性,成为并发编程中不可或缺的核心组件。而Lock体系的底层基石,正是大名鼎鼎的AQS(AbstractQueuedSynchronizer)队列同步器。本文将从使用场景到底层源码,从核心设计到实战落地,全方位拆解Lock体系与AQS的核心逻辑,帮你彻底打通并发编程的核心任督二脉。

一、Lock体系核心认知

1.1 Lock接口的核心契约

Lock接口是显式锁体系的顶层抽象,定义了锁的核心操作规范,相比synchronized,它提供了可中断、可超时、非阻塞获取、多条件队列等高级能力。核心方法如下:

  • void lock():阻塞式获取锁,获取成功后返回,不可被中断
  • void lockInterruptibly() throws InterruptedException:可中断的阻塞式获取锁,线程在等待获取锁的过程中可被中断,抛出中断异常
  • boolean tryLock():非阻塞式尝试获取锁,立即返回获取结果,成功true,失败false
  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException:带超时时间的尝试获取锁,在超时时间内获取成功返回true,超时或被中断返回false/抛出异常
  • void unlock():释放锁,必须在持有锁的线程中调用,否则抛出IllegalMonitorStateException
  • Condition newCondition():创建与当前锁绑定的条件队列,实现等待/通知机制,支持多个条件队列

1.2 Lock与synchronized的核心差异

特性 Lock显式锁 synchronized隐式锁
实现方式 JDK代码层面实现,基于AQS JVM层面实现,依赖监视器锁(Monitor)
可中断性 支持lockInterruptibly()中断等待 不可中断,获取锁的过程无法响应中断
超时控制 支持tryLock带超时时间,避免无限阻塞 不支持,获取不到锁会无限阻塞
非阻塞获取 支持tryLock()非阻塞尝试,失败立即返回 不支持,只能阻塞等待
公平性 支持公平/非公平两种模式,可灵活配置 仅支持非公平模式,JDK1.5后加入自适应自旋优化
条件队列 支持绑定多个Condition条件队列,精细化等待通知 仅能绑定一个waitSet等待队列,所有wait/notify共用
锁释放 必须手动在finally块调用unlock()释放,否则死锁 自动释放,同步代码块执行完成或异常时JVM自动释放

1.3 Lock体系的核心实现类

Lock体系的核心实现都在java.util.concurrent.locks包下,核心实现类包括:

  • ReentrantLock:可重入独占锁,最常用的Lock实现,支持公平/非公平模式,功能对标synchronized,能力更丰富
  • ReentrantReadWriteLock:可重入读写锁,维护读锁(共享锁)和写锁(独占锁)一对锁,读多写少场景下大幅提升并发性能
  • StampedLock:邮戳锁,JDK8引入的高性能读写锁替代方案,支持乐观读模式,进一步降低读操作的性能开销

二、AQS队列同步器核心设计

2.1 AQS的核心定位

AQS全称AbstractQueuedSynchronizer,是JUC包下用于构建锁和同步组件的基础框架,ReentrantLockReentrantReadWriteLockCountDownLatchSemaphore等几乎所有JUC同步组件,都是基于AQS实现的。AQS封装了同步状态的管理、线程的阻塞与唤醒、等待队列的维护等核心通用逻辑,子类只需通过模板方法模式,实现同步状态的获取与释放的自定义逻辑,即可快速构建出符合业务需求的同步组件。

2.2 AQS的核心设计思想

AQS的核心设计围绕两个核心要素展开:同步状态等待队列

  1. 同步状态(state) AQS内部维护了一个被volatile修饰的int类型变量state,用于表示同步状态:

private volatile int state;

volatile关键字保证了state变量在多线程之间的可见性,所有对state的修改都通过CAS操作保证原子性,从而实现线程安全的同步状态管理。

  • 对于独占锁(如ReentrantLock):state=0表示锁未被持有,state>0表示锁已被持有,重入时state累加
  • 对于共享锁(如Semaphore):state表示可用的许可数量,state>0表示有可用许可,state=0表示无可用许可
  • 对于读写锁(ReentrantReadWriteLock):state的高16位表示读锁的持有数量,低16位表示写锁的持有数量
  1. 等待队列(CLH双向队列) 当线程获取同步状态失败时,AQS会将该线程封装为Node节点,加入到CLH双向队列的尾部,同时阻塞该线程;当同步状态被释放时,AQS会唤醒队列头部的线程,重新尝试获取同步状态。 CLH队列是一个双向链表结构,具备FIFO的特性,AQS通过headtail两个指针分别指向队列的头节点和尾节点:

private transient volatile Node head;
private transient volatile Node tail;

头节点是哨兵节点(哑节点),不关联任何等待线程,仅作为队列的占位符,真正等待的线程从第二个节点开始。这样设计的好处是简化队列的入队和出队逻辑,避免头尾节点的空指针判断。

2.3 Node节点核心结构

Node是AQS的静态内部类,用于封装等待线程、等待状态、队列指针等信息,核心属性如下:

static final class Node {
   static final Node SHARED = new Node();
   static final Node EXCLUSIVE = null;

   static final int CANCELLED =  1;
   static final int SIGNAL    = -1;
   static final int CONDITION = -2;
   static final int PROPAGATE = -3;

   volatile int waitStatus;
   volatile Node prev;
   volatile Node next;
   volatile Thread thread;
   Node nextWaiter;
}

每个waitStatus状态的含义:

  • CANCELLED(1):当前节点的线程因超时或中断被取消,该状态的节点不会再参与锁竞争,会被移出队列
  • SIGNAL(-1):当前节点的后继节点线程被阻塞,当前节点释放锁或被取消时,必须唤醒后继节点的线程
  • CONDITION(-2):当前节点处于条件队列中,等待Condition的signal通知,被通知后会转移到同步队列中
  • PROPAGATE(-3):共享模式下,释放同步状态时需要将唤醒操作传播到后续节点
  • 0:初始状态,节点刚被创建时的默认状态

2.4 AQS的模板方法模式

AQS基于模板方法模式设计,将同步队列管理、线程阻塞唤醒等通用逻辑封装为final方法,子类无法重写;而将同步状态的获取与释放等自定义逻辑,设计为protected方法,子类必须重写这些方法来实现自己的同步逻辑。 需要子类重写的核心方法包括:

  • protected boolean tryAcquire(int arg):独占式尝试获取同步状态,成功返回true,失败返回false
  • protected boolean tryRelease(int arg):独占式尝试释放同步状态,成功返回true,失败返回false
  • protected int tryAcquireShared(int arg):共享式尝试获取同步状态,返回值>=0表示获取成功,<0表示失败
  • protected boolean tryReleaseShared(int arg):共享式尝试释放同步状态,成功返回true,失败返回false
  • protected boolean isHeldExclusively():判断当前线程是否持有独占式同步状态

三、AQS核心源码深度剖析

本文基于JDK17官方源码,逐行拆解AQS核心方法的执行逻辑。

3.1 独占式同步状态获取:acquire(int arg)

acquire方法是独占锁lock()的核心入口,ReentrantLocklock()方法最终会调用该方法,源码如下:

public final void acquire(int arg) {
   if (!tryAcquire(arg) &&
       acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
}

整个方法的执行逻辑分为4步:

  1. 第一步:tryAcquire(arg),尝试独占式获取同步状态,该方法由子类实现,成功获取返回true,方法直接结束;获取失败返回false,进入下一步。
  2. 第二步:addWaiter(Node.EXCLUSIVE),将当前线程封装为独占模式的Node节点,加入到同步队列的尾部

private Node addWaiter(Node mode) {
   Node node = new Node(Thread.currentThread(), mode);
   Node pred = tail;
   if (pred != null) {
       node.prev = pred;
       if (compareAndSetTail(pred, node)) {
           pred.next = node;
           return node;
       }
   }
   enq(node);
   return node;
}

先尝试一次快速CAS入队,如果队列已经初始化(tail不为null),直接将当前节点的prev指向原tail,CAS将tail更新为当前节点,成功后将原tail的next指向当前节点,完成入队。如果快速尝试失败,进入enq方法自旋入队。

private Node enq(final Node node) {
   for (;;) {
       Node t = tail;
       if (t == null) {
           if (compareAndSetHead(new Node()))
               tail = head;
       } else {
           node.prev = t;
           if (compareAndSetTail(t, node)) {
               t.next = node;
               return t;
           }
       }
   }
}

enq方法是一个无限自旋循环,直到节点成功入队:如果队列未初始化,先CAS创建一个哨兵节点作为头节点,完成队列初始化;队列初始化完成后,重复快速入队的逻辑,CAS将当前节点设置为新的tail,成功后返回。队列的初始化是懒加载的,只有当第一个线程获取锁失败时,才会初始化队列。

  1. 第三步:acquireQueued(final Node node, int arg),节点进入队列后,自旋尝试获取同步状态,获取失败则阻塞线程

final boolean acquireQueued(final Node node, int arg) {
   boolean interrupted = false;
   try {
       for (;;) {
           final Node p = node.predecessor();
           if (p == head && tryAcquire(arg)) {
               setHead(node);
               p.next = null;
               return interrupted;
           }
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               interrupted = true;
       }
   } catch (Throwable t) {
       cancelAcquire(node);
       throw t;
   }
}

这个方法是AQS的核心,整个逻辑是一个无限自旋循环,只有两种情况会退出循环:当前节点的前驱节点是头节点且成功获取到同步状态,或线程获取锁的过程中抛出异常。 核心逻辑:

  • 只有前驱节点是头节点的节点,才有资格尝试获取同步状态,保证队列的FIFO特性
  • 获取成功后,将当前节点设置为新的头节点,清空thread和prev指针,成为新的哨兵节点
  • 如果获取失败,调用shouldParkAfterFailedAcquire方法,判断是否需要阻塞当前线程
  • 如果需要阻塞,调用parkAndCheckInterrupt方法阻塞线程,线程被唤醒后检查是否被中断,设置中断标记

shouldParkAfterFailedAcquire方法源码:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   int ws = pred.waitStatus;
   if (ws == Node.SIGNAL)
       return true;
   if (ws > 0) {
       do {
           node.prev = pred = pred.prev;
       } while (pred.waitStatus > 0);
       pred.next = node;
   } else {
       compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
   }
   return false;
}

核心作用是确保前驱节点的状态为SIGNAL,这样前驱节点释放锁时,一定会唤醒当前节点。只有在前驱节点是SIGNAL状态时才会返回true,否则返回false,继续下一次自旋。

parkAndCheckInterrupt方法源码:

private final boolean parkAndCheckInterrupt() {
   LockSupport.park(this);
   return Thread.interrupted();
}

调用LockSupport.park()阻塞当前线程,线程被唤醒的情况有三种:前驱节点释放锁唤醒、线程被中断、虚假唤醒。线程被唤醒后,调用Thread.interrupted()检查是否被中断,同时清除中断标记,返回中断结果。

  1. 第四步:selfInterrupt(),如果acquireQueued返回true,说明线程在阻塞过程中被中断过,这里恢复线程的中断标记

static void selfInterrupt() {
   Thread.currentThread().interrupt();
}

acquire方法完整执行流程:

3.2 独占式同步状态释放:release(int arg)

release方法是独占锁unlock()的核心入口,ReentrantLockunlock()方法最终会调用该方法,源码如下:

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
   }
   return false;
}

执行逻辑:

  1. 调用tryRelease(arg)尝试释放同步状态,该方法由子类实现,释放成功返回true,失败返回false
  2. 如果释放成功,获取头节点,判断头节点不为null且waitStatus不等于0,说明有等待的线程需要唤醒,调用unparkSuccessor(h)唤醒后继节点
  3. 返回释放结果

unparkSuccessor方法源码:

private void unparkSuccessor(Node node) {
   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);

   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       s = null;
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }
   if (s != null)
       LockSupport.unpark(s.thread);
}

核心逻辑:

  1. 将头节点的waitStatus从SIGNAL重置为0
  2. 尝试获取头节点的直接后继节点,如果后继节点为null或已被取消,就从队尾向前遍历,找到队列中第一个waitStatus<=0的正常等待节点
  3. 找到正常的节点后,调用LockSupport.unpark()唤醒该节点的线程

这里从队尾向前遍历的核心原因:节点入队时,先设置prev指针,再CAS设置tail,最后设置前驱节点的next指针,next指针的设置是最后一步,可能存在prev指针已设置但next指针未设置的情况,从后往前遍历一定能找到所有节点。

3.3 共享式同步状态获取与释放

共享式与独占式的核心区别是:同一时刻可以有多个线程同时获取到同步状态。核心方法是acquireSharedreleaseSharedCountDownLatchSemaphoreReentrantReadWriteLock的读锁都是基于这两个方法实现的。

3.3.1 共享式获取:acquireShared(int arg)

public final void acquireShared(int arg) {
   if (tryAcquireShared(arg) < 0)
       doAcquireShared(arg);
}

逻辑:调用tryAcquireShared(arg)尝试共享式获取同步状态,返回值>=0表示获取成功,方法结束;返回值<0表示获取失败,进入doAcquireShared方法。

doAcquireShared源码:

private void doAcquireShared(int arg) {
   final Node node = addWaiter(Node.SHARED);
   boolean interrupted = false;
   try {
       for (;;) {
           final Node p = node.predecessor();
           if (p == head) {
               int r = tryAcquireShared(arg);
               if (r >= 0) {
                   setHeadAndPropagate(node, r);
                   p.next = null;
                   return;
               }
           }
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               interrupted = true;
       }
   } catch (Throwable t) {
       cancelAcquire(node);
       throw t;
   } finally {
       if (interrupted)
           selfInterrupt();
   }
}

和独占式的核心区别在于:获取成功后,调用的是setHeadAndPropagate方法,不仅会设置新的头节点,还会将唤醒操作传播到后续的共享节点,保证多个线程可以同时获取同步状态。

3.3.2 共享式释放:releaseShared(int arg)

public final boolean releaseShared(int arg) {
   if (tryReleaseShared(arg)) {
       doReleaseShared();
       return true;
   }
   return false;
}

doReleaseShared方法通过自旋CAS保证释放操作的传播性,确保所有符合条件的共享节点都能被唤醒。

3.4 Condition条件队列实现

Condition接口是Lock体系的等待/通知机制实现,对标synchronized的wait/notify,但是支持多个条件队列,实现更精细化的线程控制。AQS的ConditionObject类实现了Condition接口,每个Condition对象对应一个单向的条件队列,和AQS的同步队列配合,实现等待/通知机制。

3.4.1 await()方法源码剖析

await()方法的核心逻辑是:当前线程释放持有的锁,将自己封装为Node节点加入条件队列,然后阻塞等待,被signal唤醒或中断后,重新进入同步队列竞争锁。

public final void await() throws InterruptedException {
   if (Thread.interrupted())
       throw new InterruptedException();
   Node node = addConditionWaiter();
   int savedState = fullyRelease(node);
   int interruptMode = 0;
   while (!isOnSyncQueue(node)) {
       LockSupport.park(this);
       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
           break;
   }
   if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
       interruptMode = REINTERRUPT;
   if (node.nextWaiter != null)
       unlinkCancelledWaiters();
   if (interruptMode != 0)
       reportInterruptAfterWait(interruptMode);
}

3.4.2 signal()方法源码剖析

signal()方法的核心逻辑是:将条件队列中第一个等待的节点,转移到同步队列中,让其重新参与锁竞争。

public final void signal() {
   if (!isHeldExclusively())
       throw new IllegalMonitorStateException();
   Node first = firstWaiter;
   if (first != null)
       doSignal(first);
}

doSignal方法循环将节点从条件队列中移除,调用transferForSignal方法将节点转移到同步队列,转移成功后,节点进入同步队列,等待被唤醒竞争锁。

Condition等待/通知完整流程:

四、Lock体系核心实现类源码剖析

4.1 ReentrantLock可重入独占锁

ReentrantLock是Lock接口最常用的实现类,是可重入的独占锁,支持公平和非公平两种模式,默认是非公平模式。可重入的意思是,同一个线程可以多次获取同一把锁,不会出现自己阻塞自己的情况。

ReentrantLock内部实现了三个核心内部类:

  • Sync:继承自AQS,是ReentrantLock的核心同步器,实现了通用的锁逻辑
  • NonfairSync:继承自Sync,非公平锁的实现
  • FairSync:继承自Sync,公平锁的实现

4.1.1 非公平锁的实现

非公平锁是ReentrantLock的默认实现,线程获取锁时,不会排队,直接先CAS抢锁,抢不到再进入队列等待,吞吐量更高。

final void lock() {
   if (compareAndSetState(0, 1))
       setExclusiveOwnerThread(Thread.currentThread());
   else
       acquire(1);
}

非公平锁的tryAcquire逻辑:先判断state是否为0,锁未被持有,直接CAS抢锁;如果锁已被当前线程持有,state累加,实现可重入;锁被其他线程持有,返回false。

4.1.2 公平锁的实现

公平锁严格遵循FIFO规则,线程获取锁时,先判断同步队列中是否有等待的线程,如果有,直接进入队列排队,不会抢锁,保证先到先得。 和非公平锁的核心区别:在state为0时,会先调用hasQueuedPredecessors()方法,判断同步队列中是否有等待的线程,如果有,就不会CAS抢锁,直接返回false,进入队列排队,保证公平性。

4.1.3 锁释放的实现

protected final boolean tryRelease(int releases) {
   int c = getState() - releases;
   if (Thread.currentThread() != getExclusiveOwnerThread())
       throw new IllegalMonitorStateException();
   boolean free = false;
   if (c == 0) {
       free = true;
       setExclusiveOwnerThread(null);
   }
   setState(c);
   return free;
}

核心逻辑:只有持有锁的线程才能释放锁,每次unlock只能释放一次,state减1,当state减到0时,锁完全释放,清空持有者线程,返回true。

4.2 ReentrantReadWriteLock可重入读写锁

ReentrantReadWriteLock实现了ReadWriteLock接口,维护了一对锁:读锁(ReadLock,共享锁)和写锁(WriteLock,独占锁)。读锁是共享锁,同一时刻可以有多个线程同时持有读锁;写锁是独占锁,同一时刻只能有一个线程持有写锁,且写锁持有期间,所有读锁都不能被持有。读写锁适合读多写少的场景,相比独占锁,能大幅提升并发性能。

读写锁将AQS的32位int类型state拆分为两部分:

  • 高16位:表示读锁的持有数量,每获取一次读锁,高16位加1
  • 低16位:表示写锁的持有数量,每获取一次写锁,低16位加1,可重入累加

读写锁支持锁降级,不支持锁升级

  • 锁降级:持有写锁的线程,可以先获取读锁,再释放写锁,完成写锁到读锁的降级,保证数据的可见性和一致性
  • 锁升级:持有读锁的线程,尝试获取写锁,会导致死锁,因此ReentrantReadWriteLock不支持锁升级

4.3 StampedLock邮戳锁

StampedLock是JDK8引入的新型锁,是对ReentrantReadWriteLock的优化,核心改进是引入了乐观读模式,进一步降低了读操作的性能开销,提升了高并发场景下的吞吐量。

StampedLock的核心模式:

  1. 写模式:独占锁,获取锁返回一个stamp邮戳,释放锁时需要传入对应的stamp
  2. 悲观读模式:共享锁,获取锁返回一个stamp,释放锁时需要传入对应的stamp
  3. 乐观读模式:无锁模式,不会加锁,直接返回一个stamp,读取数据完成后,调用validate(stamp)验证stamp是否有效,有效说明读取期间没有写操作,数据是安全的;无效说明有写操作,需要升级为悲观读模式重新读取数据

五、实战示例

环境依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>

   <groupId>com.jam</groupId>
   <artifactId>lock-aqs-demo</artifactId>
   <version>1.0.0</version>

   <properties>
       <maven.compiler.source>17</maven.compiler.source>
       <maven.compiler.target>17</maven.compiler.target>
       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       <lombok.version>1.18.34</lombok.version>
       <slf4j.version>2.0.16</slf4j.version>
       <spring.version>6.1.14</spring.version>
       <guava.version>33.2.1-jre</guava.version>
       <fastjson2.version>2.0.54</fastjson2.version>
       <mybatis-plus.version>3.5.7</mybatis-plus.version>
       <swagger.version>2.5.0</swagger.version>
   </properties>

   <dependencies>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>${lombok.version}</version>
           <scope>provided</scope>
       </dependency>
       <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-api</artifactId>
           <version>${slf4j.version}</version>
       </dependency>
       <dependency>
           <groupId>ch.qos.logback</groupId>
           <artifactId>logback-classic</artifactId>
           <version>1.5.11</version>
       </dependency>
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-core</artifactId>
           <version>${spring.version}</version>
       </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>${guava.version}</version>
       </dependency>
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>${fastjson2.version}</version>
       </dependency>
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>${mybatis-plus.version}</version>
       </dependency>
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>${swagger.version}</version>
       </dependency>
   </dependencies>
</project>

示例1:ReentrantLock基础使用

package com.jam.demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.util.concurrent.locks.ReentrantLock;

/**
* ReentrantLock 基础使用示例
*
* @author ken
* @date 2026-03-18
*/

@Slf4j
public class ReentrantLockDemo {

   private static final ReentrantLock UNFAIR_LOCK = new ReentrantLock();
   private static final ReentrantLock FAIR_LOCK = new ReentrantLock(true);
   private static int count = 0;

   public void unfairIncrement(String threadName) {
       if (!StringUtils.hasText(threadName)) {
           log.error("线程名称不能为空");
           return;
       }
       UNFAIR_LOCK.lock();
       try {
           count++;
           log.info("非公平锁-线程{}执行递增,当前count:{}", threadName, count);
           Thread.sleep(10);
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           log.error("线程{}执行被中断", threadName, e);
       } finally {
           UNFAIR_LOCK.unlock();
       }
   }

   public void fairIncrement(String threadName) {
       if (!StringUtils.hasText(threadName)) {
           log.error("线程名称不能为空");
           return;
       }
       FAIR_LOCK.lock();
       try {
           count++;
           log.info("公平锁-线程{}执行递增,当前count:{}", threadName, count);
           Thread.sleep(10);
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           log.error("线程{}执行被中断", threadName, e);
       } finally {
           FAIR_LOCK.unlock();
       }
   }

   public void reentrantDemo(String threadName, int level) {
       if (!StringUtils.hasText(threadName) || level < 1) {
           log.error("参数非法");
           return;
       }
       UNFAIR_LOCK.lock();
       try {
           log.info("线程{}第{}次获取锁,重入层级:{}", threadName, level, level);
           if (level < 3) {
               reentrantDemo(threadName, level + 1);
           }
       } finally {
           UNFAIR_LOCK.unlock();
           log.info("线程{}第{}次释放锁,重入层级:{}", threadName, level, level);
       }
   }

   public static void main(String[] args) {
       ReentrantLockDemo demo = new ReentrantLockDemo();

       log.info("===== 非公平锁演示 =====");
       for (int i = 0; i < 5; i++) {
           final int threadNum = i;
           new Thread(() -> {
               for (int j = 0; j < 2; j++) {
                   demo.unfairIncrement(String.valueOf(threadNum));
               }
           }).start();
       }

       try {
           Thread.sleep(2000);
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
       }

       count = 0;
       log.info("\n===== 公平锁演示 =====");
       for (int i = 0; i < 5; i++) {
           final int threadNum = i;
           new Thread(() -> {
               for (int j = 0; j < 2; j++) {
                   demo.fairIncrement(String.valueOf(threadNum));
               }
           }).start();
       }

       try {
           Thread.sleep(2000);
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
       }

       log.info("\n===== 可重入性演示 =====");
       new Thread(() -> demo.reentrantDemo("reentrant-test", 1)).start();
   }
}

示例2:ReentrantReadWriteLock实现线程安全缓存

package com.jam.demo;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;

import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 基于读写锁实现的线程安全缓存
*
* @author ken
* @date 2026-03-18
*/

@Slf4j
public class ReadWriteLockCacheDemo {

   private final Map<String, Object> cacheMap = Maps.newConcurrentMap();
   private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
   private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
   private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

   public Object get(String key) {
       if (ObjectUtils.isEmpty(key)) {
           log.error("缓存key不能为空");
           return null;
       }
       readLock.lock();
       try {
           log.info("线程{}读取缓存key:{}", Thread.currentThread().getName(), key);
           return cacheMap.get(key);
       } finally {
           readLock.unlock();
       }
   }

   public void put(String key, Object value) {
       if (ObjectUtils.isEmpty(key)) {
           log.error("缓存key不能为空");
           return;
       }
       writeLock.lock();
       try {
           log.info("线程{}写入缓存key:{}, value:{}", Thread.currentThread().getName(), key, value);
           cacheMap.put(key, value);
       } finally {
           writeLock.unlock();
       }
   }

   public Object lockDegradeDemo(String key) {
       if (ObjectUtils.isEmpty(key)) {
           log.error("缓存key不能为空");
           return null;
       }
       Object value;
       writeLock.lock();
       try {
           value = System.currentTimeMillis();
           cacheMap.put(key, value);
           log.info("线程{}写入数据,key:{}, value:{}", Thread.currentThread().getName(), key, value);
           readLock.lock();
           log.info("线程{}获取读锁,完成锁降级", Thread.currentThread().getName());
       } finally {
           writeLock.unlock();
           log.info("线程{}释放写锁", Thread.currentThread().getName());
       }

       try {
           value = cacheMap.get(key);
           log.info("线程{}读取数据,key:{}, value:{}", Thread.currentThread().getName(), key, value);
           return value;
       } finally {
           readLock.unlock();
           log.info("线程{}释放读锁", Thread.currentThread().getName());
       }
   }

   public static void main(String[] args) {
       ReadWriteLockCacheDemo cache = new ReadWriteLockCacheDemo();

       for (int i = 0; i < 10; i++) {
           new Thread(() -> {
               for (int j = 0; j < 5; j++) {
                   cache.get("test-key");
                   try {
                       Thread.sleep(100);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                   }
               }
           }, "read-thread-" + i).start();
       }

       for (int i = 0; i < 2; i++) {
           final int num = i;
           new Thread(() -> {
               for (int j = 0; j < 3; j++) {
                   cache.put("test-key", "value-" + num + "-" + j);
                   try {
                       Thread.sleep(500);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                   }
               }
           }, "write-thread-" + i).start();
       }

       try {
           Thread.sleep(3000);
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
       }

       log.info("\n===== 锁降级演示 =====");
       new Thread(() -> cache.lockDegradeDemo("degrade-key"), "degrade-thread").start();
   }
}

示例3:Condition实现生产者消费者模型

package com.jam.demo;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 基于Condition实现的生产者消费者模型
*
* @author ken
* @date 2026-03-18
*/

@Slf4j
public class ConditionProducerConsumerDemo {

   private static final int MAX_CAPACITY = 10;
   private final List<Integer> queue = Lists.newLinkedList();
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition notEmpty = lock.newCondition();
   private final Condition notFull = lock.newCondition();

   public void produce(Integer data) throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == MAX_CAPACITY) {
               log.info("队列已满,生产者线程{}等待", Thread.currentThread().getName());
               notFull.await();
           }
           queue.add(data);
           log.info("生产者线程{}生产数据:{}, 当前队列大小:{}", Thread.currentThread().getName(), data, queue.size());
           notEmpty.signalAll();
       } finally {
           lock.unlock();
       }
   }

   public Integer consume() throws InterruptedException {
       lock.lock();
       try {
           while (queue.isEmpty()) {
               log.info("队列为空,消费者线程{}等待", Thread.currentThread().getName());
               notEmpty.await();
           }
           Integer data = queue.remove(0);
           log.info("消费者线程{}消费数据:{}, 当前队列大小:{}", Thread.currentThread().getName(), data, queue.size());
           notFull.signalAll();
           return data;
       } finally {
           lock.unlock();
       }
   }

   public static void main(String[] args) {
       ConditionProducerConsumerDemo demo = new ConditionProducerConsumerDemo();

       for (int i = 0; i < 2; i++) {
           final int producerNum = i;
           new Thread(() -> {
               int data = 0;
               while (true) {
                   try {
                       demo.produce(producerNum * 100 + data);
                       data++;
                       Thread.sleep(200);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                       break;
                   }
               }
           }, "producer-" + i).start();
       }

       for (int i = 0; i < 3; i++) {
           new Thread(() -> {
               while (true) {
                   try {
                       demo.consume();
                       Thread.sleep(500);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                       break;
                   }
               }
           }, "consumer-" + i).start();
       }
   }
}

示例4:基于AQS自定义同步组件

package com.jam.demo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
* 基于AQS自定义共享锁:同一时间最多允许2个线程同时访问
*
* @author ken
* @date 2026-03-18
*/

@Slf4j
public class CustomShareLock {

   private static final int MAX_PERMIT = 2;

   private static class Sync extends AbstractQueuedSynchronizer {

       public Sync() {
           setState(MAX_PERMIT);
       }

       @Override
       protected int tryAcquireShared(int arg) {
           for (;;) {
               int current = getState();
               int remain = current - arg;
               if (remain < 0 || compareAndSetState(current, remain)) {
                   return remain;
               }
           }
       }

       @Override
       protected boolean tryReleaseShared(int arg) {
           for (;;) {
               int current = getState();
               int newState = current + arg;
               if (newState > MAX_PERMIT) {
                   throw new IllegalArgumentException("许可数量超过最大值");
               }
               if (compareAndSetState(current, newState)) {
                   return true;
               }
           }
       }
   }

   private final Sync sync = new Sync();

   public void lock() {
       sync.acquireShared(1);
   }

   public void unlock() {
       sync.releaseShared(1);
   }

   public static void main(String[] args) {
       CustomShareLock lock = new CustomShareLock();

       for (int i = 0; i < 5; i++) {
           final int threadNum = i;
           new Thread(() -> {
               while (true) {
                   lock.lock();
                   try {
                       log.info("线程{}获取锁,执行业务逻辑", threadNum);
                       Thread.sleep(1000);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                   } finally {
                       log.info("线程{}释放锁", threadNum);
                       lock.unlock();
                   }
                   try {
                       Thread.sleep(500);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                   }
               }
           }, "thread-" + i).start();
       }
   }
}

示例5:Spring Boot接口集成(带Swagger3注解)

package com.jam.demo.controller;

import com.jam.demo.service.LockDemoService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

/**
* 锁演示接口
*
* @author ken
* @date 2026-03-18
*/

@Slf4j
@RestController
@RequestMapping("/lock/demo")
@RequiredArgsConstructor
@Tag(name = "锁演示接口", description = "Lock体系与AQS演示接口")
public class LockDemoController {

   private final LockDemoService lockDemoService;

   @PostMapping("/increment")
   @Operation(summary = "递增计数", description = "基于ReentrantLock实现线程安全的计数递增")
   public Integer increment(
           @Parameter(description = "锁类型:fair-公平锁,unfair-非公平锁", required = true)

           @RequestParam String type) {
       return lockDemoService.increment(type);
   }

   @GetMapping("/cache/get")
   @Operation(summary = "获取缓存数据", description = "基于读写锁实现的线程安全缓存读取")
   public Object getCache(
           @Parameter(description = "缓存key", required = true)

           @RequestParam String key) {
       return lockDemoService.getCache(key);
   }

   @PostMapping("/cache/put")
   @Operation(summary = "写入缓存数据", description = "基于读写锁实现的线程安全缓存写入")
   public void putCache(
           @Parameter(description = "缓存key", required = true)

           @RequestParam String key,
           @Parameter(description = "缓存value", required = true)
           @RequestParam String value) {
       lockDemoService.putCache(key, value);
   }
}

package com.jam.demo.service;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 锁演示服务
*
* @author ken
* @date 2026-03-18
*/

@Slf4j
@Service
public class LockDemoService {

   private final ReentrantLock unfairLock = new ReentrantLock();
   private final ReentrantLock fairLock = new ReentrantLock(true);
   private volatile int count = 0;

   private final Map<String, Object> cacheMap = Maps.newConcurrentMap();
   private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
   private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
   private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

   public Integer increment(String type) {
       if (!StringUtils.hasText(type)) {
           throw new IllegalArgumentException("锁类型不能为空");
       }
       ReentrantLock lock = "fair".equals(type) ? fairLock : unfairLock;
       lock.lock();
       try {
           count++;
           return count;
       } finally {
           lock.unlock();
       }
   }

   public Object getCache(String key) {
       if (!StringUtils.hasText(key)) {
           throw new IllegalArgumentException("缓存key不能为空");
       }
       readLock.lock();
       try {
           return cacheMap.get(key);
       } finally {
           readLock.unlock();
       }
   }

   public void putCache(String key, String value) {
       if (!StringUtils.hasText(key)) {
           throw new IllegalArgumentException("缓存key不能为空");
       }
       writeLock.lock();
       try {
           cacheMap.put(key, value);
       } finally {
           writeLock.unlock();
       }
   }
}

六、易混淆点深度辨析

  1. Lock与synchronized的核心选择标准
  • 优先使用synchronized:简单场景,不需要高级特性,JVM自动优化,开发成本低,不会出现锁泄漏
  • 必须使用Lock:需要可中断、超时控制、公平锁、多个条件队列等高级特性,精细化锁控制
  1. 公平锁与非公平锁的性能差异
  • 非公平锁吞吐量更高:线程获取锁时先CAS抢锁,减少了线程阻塞和唤醒的开销,但是可能出现线程饥饿
  • 公平锁严格保证顺序:不会出现线程饥饿,但是吞吐量低,所有线程都要排队,开销大
  • 生产环境默认使用非公平锁,除非业务场景必须严格保证先到先得的顺序
  1. AQS的头节点为什么是哨兵节点哨兵节点的核心作用是简化队列的入队和出队逻辑,避免头尾节点的空指针判断。有哨兵节点,队列永远不会空,入队只需要处理tail指针,出队只需要处理head指针,逻辑更简单,CAS操作的竞争点更少,性能更高。
  2. park/unpark与wait/notify的核心区别
特性 LockSupport.park/unpark Object.wait/notify
锁依赖 不需要持有锁,任何时候都可以调用 必须在synchronized同步块中持有对象锁才能调用
调用顺序 可以先unpark,后park,线程不会阻塞 必须先wait,后notify,否则会丢失通知,线程永久阻塞
中断处理 park不会抛出中断异常,只会返回,需要手动检查中断状态 wait会抛出InterruptedException,必须捕获处理
唤醒粒度 unpark可以精准唤醒指定的线程 notify只能随机唤醒一个等待线程,无法精准唤醒

七、生产环境最佳实践

  1. 锁释放必须在finally块中显式Lock必须手动调用unlock()释放,必须在finally块中调用,否则业务代码抛出异常时,锁无法释放,导致死锁。
  2. 优先使用非公平锁除非业务场景必须严格保证公平性,否则优先使用默认的非公平锁,非公平锁的吞吐量远高于公平锁。
  3. 合理使用tryLock避免死锁使用tryLock带超时时间的方法获取锁,避免线程无限阻塞,当获取锁超时的时候,可以回滚操作,释放已经获取的锁,有效避免死锁问题。
  4. 读写锁在读多写少场景使用ReentrantReadWriteLock适合读多写少的场景,比如缓存、配置中心等。如果读写频率差不多,甚至写多写少,读写锁的性能不如独占锁。
  5. 避免在持有锁的期间执行耗时操作持有锁的期间执行耗时的IO操作、网络调用等,会导致锁持有时间过长,其他线程长时间阻塞,吞吐量大幅下降。应该尽量缩小锁的范围,只在访问共享资源的核心代码块加锁。
  6. 优先使用StampedLock优化高并发读场景对于读多写少的高并发场景,优先使用StampedLock的乐观读模式,相比ReentrantReadWriteLock,乐观读模式不需要加锁,性能更高,能大幅提升系统的吞吐量。

总结

Lock体系与AQS是Java并发编程的核心基石,AQS封装了同步状态管理、等待队列维护、线程阻塞与唤醒等通用的底层逻辑,让上层的锁和同步组件只需要关注业务相关的同步状态获取与释放逻辑,大幅降低了并发组件的开发成本。

目录
相关文章
|
9天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5288 11
|
16天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
21367 116
|
13天前
|
人工智能 安全 前端开发
Team 版 OpenClaw:HiClaw 开源,5 分钟完成本地安装
HiClaw 基于 OpenClaw、Higress AI Gateway、Element IM 客户端+Tuwunel IM 服务器(均基于 Matrix 实时通信协议)、MinIO 共享文件系统打造。
8172 7

热门文章

最新文章