Java 并发编程之AQS

简介: Java 并发编程之AQS

Java 并发编程之AQS

AbstractQueuedSynchronizer (AQS) 是 Java 并发编程中的一个核心框架,广泛用于构建锁和其他同步器(如信号量、读写锁等)。它是 java.util.concurrent.locks 包的一部分。AQS 的设计目的是简化并发同步器的实现。理解 AQS 对于深入理解 Java 并发编程非常重要。

AQS 的基本原理

1. 核心数据结构

AQS 主要依赖一个 FIFO 队列(先进先出队列)来管理获取锁的线程。它的核心数据结构包括:

  • State:一个整型变量,表示同步状态。具体含义取决于具体的同步器(如 ReentrantLock、
  • Semaphore 等)。
  • 队列节点(Node):用来表示请求共享资源的每个线程。
  • 队列头(head)和尾(tail):指向等待队列的头和尾节点。

2. 独占模式和共享模式

AQS 支持两种锁模式:

  • 独占模式:只有一个线程能占有资源。例如 ReentrantLock。
  • 共享模式:多个线程可以共享资源。例如 Semaphore 和 CountDownLatch。

3. 关键方法

AQS 提供了一些需要子类实现的方法(如 tryAcquire、tryRelease 等),子类通过实现这些方法来定义具体的同步逻辑。

AQS 的主要方法

1. 独占模式

  • acquire(int arg):尝试获取资源,如果失败则将线程加入等待队列并阻塞。
  • release(int arg):释放资源,唤醒等待队列中的下一个节点(如果有)。

2. 共享模式

  • acquireShared(int arg):尝试获取共享资源,如果失败则将线程加入等待队列并阻塞。
  • releaseShared(int arg):释放共享资源,唤醒等待队列中的下一个节点(如果有)。

3. 队列操作

  • addWaiter(Node mode):将当前线程加入等待队列。
  • unparkSuccessor(Node node):唤醒等待队列中的下一个节点。

内部类 Node

AQS 内部使用一个 Node 类表示等待队列中的每个节点。Node 类的定义如下

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;
    volatile Node next;
    volatile Thread thread;

    Node nextWaiter;

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

具体实现细节

1. 获取独占锁

尝试获取独占锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 方法的主要流程是:

  • 调用 tryAcquire 尝试获取锁(需要子类实现)。
  • 如果获取失败,调用 addWaiter 将当前线程加入等待队列,并调用 acquireQueued 使线程进入等待状态。

tryAcquire(int arg)

尝试获取锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

2. 释放独占锁

release 方法是释放独占锁的入口:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release 方法的主要流程是:

  • 调用 tryRelease 尝试释放锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryRelease(int arg)

尝试释放锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

3. 获取共享锁

尝试获取共享锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

acquireShared 方法的主要流程是:

  • 调用 tryAcquireShared 尝试获取共享锁(需要子类实现)。
  • 如果获取失败,调用 doAcquireShared 使线程进入等待状态。

tryAcquireShared(int arg)

尝试获取共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

4. 释放共享锁

releaseShared 方法是释放共享锁的入口:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared 方法的主要流程是:

  • 调用 tryReleaseShared 尝试释放共享锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryReleaseShared(int arg)

尝试释放共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

AQS 内部实现细节

1. addWaiter(Node mode)

将当前线程封装成节点并加入等待队列:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter 方法尝试快速将节点加入队列的尾部,如果失败则调用 enq 方法进行完整的入队操作。

2. enq(Node node)

将节点加入等待队列:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq 方法通过自旋(无限循环)将节点加入队列,确保线程安全。

2. acquireQueued(final Node node, int arg)

使线程进入等待状态,直到获取到锁:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

cquireQueued 方法的主要流程是:

  • 尝试获取锁,如果成功则将节点设置为队列头。
  • 如果获取失败,则检查是否应该挂起线程(通过 shouldParkAfterFailedAcquire)。
  • 挂起线程(通过 parkAndCheckInterrupt),直到被唤醒。

3. shouldParkAfterFailedAcquire(Node pred, Node node)

检查是否应该挂起线程:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire 方法检查前驱节点的状态,如果前驱节点的状态是 SIGNAL,表示当前线程可以安全地挂起。

4. parkAndCheckInterrupt()

挂起线程并检查中断状态:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt 方法使用 LockSupport.park 挂起线程,并在被唤醒后返回中断状态。

详细示例:ReentrantLock

以下是基于 AQS 实现 ReentrantLock 的示例:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyReentrantLock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() != 0 && getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

结论

AQS 是 Java 并发框架中用于实现锁和其他同步器的基础组件。它通过一个 FIFO 队列管理获取锁的线程,并提供模板方法让子类实现具体的同步逻辑。理解 AQS 的工作原理有助于深入理解 Java 并发编程及其高性能实现。

目录
相关文章
|
4天前
|
设计模式 安全 Java
Java编程中的单例模式:理解与实践
【10月更文挑战第31天】在Java的世界里,单例模式是一种优雅的解决方案,它确保一个类只有一个实例,并提供一个全局访问点。本文将深入探讨单例模式的实现方式、使用场景及其优缺点,同时提供代码示例以加深理解。无论你是Java新手还是有经验的开发者,掌握单例模式都将是你技能库中的宝贵财富。
12 2
|
7天前
|
Java API Apache
Java编程如何读取Word文档里的Excel表格,并在保存文本内容时保留表格的样式?
【10月更文挑战第29天】Java编程如何读取Word文档里的Excel表格,并在保存文本内容时保留表格的样式?
35 5
|
1天前
|
安全 Java 编译器
JDK 10中的局部变量类型推断:Java编程的简化与革新
JDK 10引入的局部变量类型推断通过`var`关键字简化了代码编写,提高了可读性。编译器根据初始化表达式自动推断变量类型,减少了冗长的类型声明。虽然带来了诸多优点,但也有一些限制,如只能用于局部变量声明,并需立即初始化。这一特性使Java更接近动态类型语言,增强了灵活性和易用性。
78 53
|
1天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
15 1
|
5天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
6天前
|
Java 开发者
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
31 4
|
6天前
|
消息中间件 供应链 Java
掌握Java多线程编程的艺术
【10月更文挑战第29天】 在当今软件开发领域,多线程编程已成为提升应用性能和响应速度的关键手段之一。本文旨在深入探讨Java多线程编程的核心技术、常见问题以及最佳实践,通过实际案例分析,帮助读者理解并掌握如何在Java应用中高效地使用多线程。不同于常规的技术总结,本文将结合作者多年的实践经验,以故事化的方式讲述多线程编程的魅力与挑战,旨在为读者提供一种全新的学习视角。
27 3
|
4天前
|
设计模式 安全 Java
Java编程中的单例模式深入解析
【10月更文挑战第31天】在编程世界中,设计模式就像是建筑中的蓝图,它们定义了解决常见问题的最佳实践。本文将通过浅显易懂的语言带你深入了解Java中广泛应用的单例模式,并展示如何实现它。
|
6天前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
20 2
|
7天前
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
19 1
下一篇
无影云桌面