Java 并发编程之AQS

简介: Java 并发编程之AQS

Java 并发编程之AQS

AbstractQueuedSynchronizer (AQS) 是 Java 并发编程中的一个核心框架,广泛用于构建锁和其他同步器(如信号量、读写锁等)。它是 java.util.concurrent.locks 包的一部分。AQS 的设计目的是简化并发同步器的实现。理解 AQS 对于深入理解 Java 并发编程非常重要。

AQS 的基本原理

1. 核心数据结构

AQS 主要依赖一个 FIFO 队列(先进先出队列)来管理获取锁的线程。它的核心数据结构包括:

  • State:一个整型变量,表示同步状态。具体含义取决于具体的同步器(如 ReentrantLock、
  • Semaphore 等)。
  • 队列节点(Node):用来表示请求共享资源的每个线程。
  • 队列头(head)和尾(tail):指向等待队列的头和尾节点。

2. 独占模式和共享模式

AQS 支持两种锁模式:

  • 独占模式:只有一个线程能占有资源。例如 ReentrantLock。
  • 共享模式:多个线程可以共享资源。例如 Semaphore 和 CountDownLatch。

3. 关键方法

AQS 提供了一些需要子类实现的方法(如 tryAcquire、tryRelease 等),子类通过实现这些方法来定义具体的同步逻辑。

AQS 的主要方法

1. 独占模式

  • acquire(int arg):尝试获取资源,如果失败则将线程加入等待队列并阻塞。
  • release(int arg):释放资源,唤醒等待队列中的下一个节点(如果有)。

2. 共享模式

  • acquireShared(int arg):尝试获取共享资源,如果失败则将线程加入等待队列并阻塞。
  • releaseShared(int arg):释放共享资源,唤醒等待队列中的下一个节点(如果有)。

3. 队列操作

  • addWaiter(Node mode):将当前线程加入等待队列。
  • unparkSuccessor(Node node):唤醒等待队列中的下一个节点。

内部类 Node

AQS 内部使用一个 Node 类表示等待队列中的每个节点。Node 类的定义如下

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;
    volatile Node next;
    volatile Thread thread;

    Node nextWaiter;

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

具体实现细节

1. 获取独占锁

尝试获取独占锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 方法的主要流程是:

  • 调用 tryAcquire 尝试获取锁(需要子类实现)。
  • 如果获取失败,调用 addWaiter 将当前线程加入等待队列,并调用 acquireQueued 使线程进入等待状态。

tryAcquire(int arg)

尝试获取锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

2. 释放独占锁

release 方法是释放独占锁的入口:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release 方法的主要流程是:

  • 调用 tryRelease 尝试释放锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryRelease(int arg)

尝试释放锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

3. 获取共享锁

尝试获取共享锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

acquireShared 方法的主要流程是:

  • 调用 tryAcquireShared 尝试获取共享锁(需要子类实现)。
  • 如果获取失败,调用 doAcquireShared 使线程进入等待状态。

tryAcquireShared(int arg)

尝试获取共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

4. 释放共享锁

releaseShared 方法是释放共享锁的入口:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared 方法的主要流程是:

  • 调用 tryReleaseShared 尝试释放共享锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryReleaseShared(int arg)

尝试释放共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

AQS 内部实现细节

1. addWaiter(Node mode)

将当前线程封装成节点并加入等待队列:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter 方法尝试快速将节点加入队列的尾部,如果失败则调用 enq 方法进行完整的入队操作。

2. enq(Node node)

将节点加入等待队列:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq 方法通过自旋(无限循环)将节点加入队列,确保线程安全。

2. acquireQueued(final Node node, int arg)

使线程进入等待状态,直到获取到锁:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

cquireQueued 方法的主要流程是:

  • 尝试获取锁,如果成功则将节点设置为队列头。
  • 如果获取失败,则检查是否应该挂起线程(通过 shouldParkAfterFailedAcquire)。
  • 挂起线程(通过 parkAndCheckInterrupt),直到被唤醒。

3. shouldParkAfterFailedAcquire(Node pred, Node node)

检查是否应该挂起线程:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire 方法检查前驱节点的状态,如果前驱节点的状态是 SIGNAL,表示当前线程可以安全地挂起。

4. parkAndCheckInterrupt()

挂起线程并检查中断状态:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt 方法使用 LockSupport.park 挂起线程,并在被唤醒后返回中断状态。

详细示例:ReentrantLock

以下是基于 AQS 实现 ReentrantLock 的示例:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyReentrantLock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() != 0 && getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

结论

AQS 是 Java 并发框架中用于实现锁和其他同步器的基础组件。它通过一个 FIFO 队列管理获取锁的线程,并提供模板方法让子类实现具体的同步逻辑。理解 AQS 的工作原理有助于深入理解 Java 并发编程及其高性能实现。

目录
相关文章
|
4天前
|
Java
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
|
20小时前
|
Java 程序员 调度
Java并发编程之Executor框架深度解析
【6月更文挑战第24天】在Java的并发编程领域,Executor框架是处理多线程任务的核心。本文将深入探讨Executor框架的设计哲学、核心组件以及如何高效利用这一框架来提升程序的性能和响应性。我们将通过实例演示如何正确配置和使用Executor,并讨论常见的陷阱与最佳实践。
|
2天前
|
Java
Java并发编程:深入理解synchronized与ReentrantLock
【6月更文挑战第22天】本文将深入探讨Java并发编程中两个重要的同步机制:synchronized关键字和ReentrantLock类。我们将通过实例分析它们之间的差异,以及在实际应用中如何根据场景选择恰当的同步工具。
|
1天前
|
存储 安全 Java
java编程SimpleDateFormat详解
java编程SimpleDateFormat详解
|
2天前
|
Java
Java并发编程中锁的释放
Java并发编程中锁的释放
12 1
|
15小时前
|
Java 机器人 数据库
Java中的Servlet编程:从基础到高级应用
Java中的Servlet编程:从基础到高级应用
|
15小时前
|
Java 机器人 程序员
Java中的反射编程实用指南
Java中的反射编程实用指南
|
15小时前
|
Java 机器人 程序员
Java中的GUI编程入门指南
Java中的GUI编程入门指南
|
16小时前
|
监控 网络协议 安全
Java中的WebSocket编程详解
Java中的WebSocket编程详解
|
1天前
|
人工智能 算法 搜索推荐
Java算法编程详解和程序实例
Java算法编程详解和程序实例