Java 并发编程之AQS

简介: Java 并发编程之AQS

Java 并发编程之AQS

AbstractQueuedSynchronizer (AQS) 是 Java 并发编程中的一个核心框架,广泛用于构建锁和其他同步器(如信号量、读写锁等)。它是 java.util.concurrent.locks 包的一部分。AQS 的设计目的是简化并发同步器的实现。理解 AQS 对于深入理解 Java 并发编程非常重要。

AQS 的基本原理

1. 核心数据结构

AQS 主要依赖一个 FIFO 队列(先进先出队列)来管理获取锁的线程。它的核心数据结构包括:

  • State:一个整型变量,表示同步状态。具体含义取决于具体的同步器(如 ReentrantLock、
  • Semaphore 等)。
  • 队列节点(Node):用来表示请求共享资源的每个线程。
  • 队列头(head)和尾(tail):指向等待队列的头和尾节点。

2. 独占模式和共享模式

AQS 支持两种锁模式:

  • 独占模式:只有一个线程能占有资源。例如 ReentrantLock。
  • 共享模式:多个线程可以共享资源。例如 Semaphore 和 CountDownLatch。

3. 关键方法

AQS 提供了一些需要子类实现的方法(如 tryAcquire、tryRelease 等),子类通过实现这些方法来定义具体的同步逻辑。

AQS 的主要方法

1. 独占模式

  • acquire(int arg):尝试获取资源,如果失败则将线程加入等待队列并阻塞。
  • release(int arg):释放资源,唤醒等待队列中的下一个节点(如果有)。

2. 共享模式

  • acquireShared(int arg):尝试获取共享资源,如果失败则将线程加入等待队列并阻塞。
  • releaseShared(int arg):释放共享资源,唤醒等待队列中的下一个节点(如果有)。

3. 队列操作

  • addWaiter(Node mode):将当前线程加入等待队列。
  • unparkSuccessor(Node node):唤醒等待队列中的下一个节点。

内部类 Node

AQS 内部使用一个 Node 类表示等待队列中的每个节点。Node 类的定义如下

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;
    volatile Node next;
    volatile Thread thread;

    Node nextWaiter;

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

具体实现细节

1. 获取独占锁

尝试获取独占锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 方法的主要流程是:

  • 调用 tryAcquire 尝试获取锁(需要子类实现)。
  • 如果获取失败,调用 addWaiter 将当前线程加入等待队列,并调用 acquireQueued 使线程进入等待状态。

tryAcquire(int arg)

尝试获取锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

2. 释放独占锁

release 方法是释放独占锁的入口:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release 方法的主要流程是:

  • 调用 tryRelease 尝试释放锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryRelease(int arg)

尝试释放锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

3. 获取共享锁

尝试获取共享锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

acquireShared 方法的主要流程是:

  • 调用 tryAcquireShared 尝试获取共享锁(需要子类实现)。
  • 如果获取失败,调用 doAcquireShared 使线程进入等待状态。

tryAcquireShared(int arg)

尝试获取共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

4. 释放共享锁

releaseShared 方法是释放共享锁的入口:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared 方法的主要流程是:

  • 调用 tryReleaseShared 尝试释放共享锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryReleaseShared(int arg)

尝试释放共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

AQS 内部实现细节

1. addWaiter(Node mode)

将当前线程封装成节点并加入等待队列:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter 方法尝试快速将节点加入队列的尾部,如果失败则调用 enq 方法进行完整的入队操作。

2. enq(Node node)

将节点加入等待队列:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq 方法通过自旋(无限循环)将节点加入队列,确保线程安全。

2. acquireQueued(final Node node, int arg)

使线程进入等待状态,直到获取到锁:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

cquireQueued 方法的主要流程是:

  • 尝试获取锁,如果成功则将节点设置为队列头。
  • 如果获取失败,则检查是否应该挂起线程(通过 shouldParkAfterFailedAcquire)。
  • 挂起线程(通过 parkAndCheckInterrupt),直到被唤醒。

3. shouldParkAfterFailedAcquire(Node pred, Node node)

检查是否应该挂起线程:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire 方法检查前驱节点的状态,如果前驱节点的状态是 SIGNAL,表示当前线程可以安全地挂起。

4. parkAndCheckInterrupt()

挂起线程并检查中断状态:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt 方法使用 LockSupport.park 挂起线程,并在被唤醒后返回中断状态。

详细示例:ReentrantLock

以下是基于 AQS 实现 ReentrantLock 的示例:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyReentrantLock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() != 0 && getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

结论

AQS 是 Java 并发框架中用于实现锁和其他同步器的基础组件。它通过一个 FIFO 队列管理获取锁的线程,并提供模板方法让子类实现具体的同步逻辑。理解 AQS 的工作原理有助于深入理解 Java 并发编程及其高性能实现。

目录
相关文章
|
29天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
31 0
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
11天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
15天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
49 12
|
11天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
93 2
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
28天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
28天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
50 3
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
159 6
|
1月前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
47 3