Java 并发编程之AQS

简介: Java 并发编程之AQS

Java 并发编程之AQS

AbstractQueuedSynchronizer (AQS) 是 Java 并发编程中的一个核心框架,广泛用于构建锁和其他同步器(如信号量、读写锁等)。它是 java.util.concurrent.locks 包的一部分。AQS 的设计目的是简化并发同步器的实现。理解 AQS 对于深入理解 Java 并发编程非常重要。

AQS 的基本原理

1. 核心数据结构

AQS 主要依赖一个 FIFO 队列(先进先出队列)来管理获取锁的线程。它的核心数据结构包括:

  • State:一个整型变量,表示同步状态。具体含义取决于具体的同步器(如 ReentrantLock、
  • Semaphore 等)。
  • 队列节点(Node):用来表示请求共享资源的每个线程。
  • 队列头(head)和尾(tail):指向等待队列的头和尾节点。

2. 独占模式和共享模式

AQS 支持两种锁模式:

  • 独占模式:只有一个线程能占有资源。例如 ReentrantLock。
  • 共享模式:多个线程可以共享资源。例如 Semaphore 和 CountDownLatch。

3. 关键方法

AQS 提供了一些需要子类实现的方法(如 tryAcquire、tryRelease 等),子类通过实现这些方法来定义具体的同步逻辑。

AQS 的主要方法

1. 独占模式

  • acquire(int arg):尝试获取资源,如果失败则将线程加入等待队列并阻塞。
  • release(int arg):释放资源,唤醒等待队列中的下一个节点(如果有)。

2. 共享模式

  • acquireShared(int arg):尝试获取共享资源,如果失败则将线程加入等待队列并阻塞。
  • releaseShared(int arg):释放共享资源,唤醒等待队列中的下一个节点(如果有)。

3. 队列操作

  • addWaiter(Node mode):将当前线程加入等待队列。
  • unparkSuccessor(Node node):唤醒等待队列中的下一个节点。

内部类 Node

AQS 内部使用一个 Node 类表示等待队列中的每个节点。Node 类的定义如下

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;
    volatile Node next;
    volatile Thread thread;

    Node nextWaiter;

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

具体实现细节

1. 获取独占锁

尝试获取独占锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 方法的主要流程是:

  • 调用 tryAcquire 尝试获取锁(需要子类实现)。
  • 如果获取失败,调用 addWaiter 将当前线程加入等待队列,并调用 acquireQueued 使线程进入等待状态。

tryAcquire(int arg)

尝试获取锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

2. 释放独占锁

release 方法是释放独占锁的入口:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release 方法的主要流程是:

  • 调用 tryRelease 尝试释放锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryRelease(int arg)

尝试释放锁,需要子类实现。例如,ReentrantLock 中的实现如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

3. 获取共享锁

尝试获取共享锁,如果获取失败,则将当前线程加入等待队列并阻塞:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

acquireShared 方法的主要流程是:

  • 调用 tryAcquireShared 尝试获取共享锁(需要子类实现)。
  • 如果获取失败,调用 doAcquireShared 使线程进入等待状态。

tryAcquireShared(int arg)

尝试获取共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

4. 释放共享锁

releaseShared 方法是释放共享锁的入口:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared 方法的主要流程是:

  • 调用 tryReleaseShared 尝试释放共享锁(需要子类实现)。
  • 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。

tryReleaseShared(int arg)

尝试释放共享锁,需要子类实现。例如,Semaphore 中的实现如下:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

AQS 内部实现细节

1. addWaiter(Node mode)

将当前线程封装成节点并加入等待队列:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter 方法尝试快速将节点加入队列的尾部,如果失败则调用 enq 方法进行完整的入队操作。

2. enq(Node node)

将节点加入等待队列:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq 方法通过自旋(无限循环)将节点加入队列,确保线程安全。

2. acquireQueued(final Node node, int arg)

使线程进入等待状态,直到获取到锁:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

cquireQueued 方法的主要流程是:

  • 尝试获取锁,如果成功则将节点设置为队列头。
  • 如果获取失败,则检查是否应该挂起线程(通过 shouldParkAfterFailedAcquire)。
  • 挂起线程(通过 parkAndCheckInterrupt),直到被唤醒。

3. shouldParkAfterFailedAcquire(Node pred, Node node)

检查是否应该挂起线程:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire 方法检查前驱节点的状态,如果前驱节点的状态是 SIGNAL,表示当前线程可以安全地挂起。

4. parkAndCheckInterrupt()

挂起线程并检查中断状态:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt 方法使用 LockSupport.park 挂起线程,并在被唤醒后返回中断状态。

详细示例:ReentrantLock

以下是基于 AQS 实现 ReentrantLock 的示例:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyReentrantLock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() != 0 && getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

结论

AQS 是 Java 并发框架中用于实现锁和其他同步器的基础组件。它通过一个 FIFO 队列管理获取锁的线程,并提供模板方法让子类实现具体的同步逻辑。理解 AQS 的工作原理有助于深入理解 Java 并发编程及其高性能实现。

目录
打赏
0
0
0
0
22
分享
相关文章
|
16天前
|
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
前言 主播觉得,AQS的原理,就是通过这2个队列的协助,实现核心功能,同步队列(CLH队列)和条件队列(Condition队列)。 同步队列(CLH队列) 作用:管理需要获...
59 18
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
【Java并发】【AQS】适合初学者体质的AQS入门
AQS这是灰常重要的哈,很多JUC下的框架的核心,那都是我们的AQS,所以这里,我们直接开始先研究AQS。 那说到研究AQS,那我们应该,使用开始说起🤓 入门 什么是AQS? AQS(Abst
66 8
【Java并发】【AQS】适合初学者体质的AQS入门
k8s的出现解决了java并发编程胡问题了
Kubernetes通过提供自动化管理、资源管理、服务发现和负载均衡、持续交付等功能,有效地解决了Java并发编程中的许多复杂问题。它不仅简化了线程管理和资源共享,还提供了强大的负载均衡和故障恢复机制,确保应用程序在高并发环境下的高效运行和稳定性。通过合理配置和使用Kubernetes,开发者可以显著提高Java应用程序的性能和可靠性。
72 31
注解的艺术:Java编程的高级定制
注解是Java编程中的高级特性,通过内置注解、自定义注解及注解处理器,可以实现代码的高度定制和扩展。通过理解和掌握注解的使用方法,开发者可以提高代码的可读性、可维护性和开发效率。在实际应用中,注解广泛用于框架开发、代码生成和配置管理等方面,展示了其强大的功能和灵活性。
68 25
在线编程实现!如何在Java后端通过DockerClient操作Docker生成python环境
以上内容是一个简单的实现在Java后端中通过DockerClient操作Docker生成python环境并执行代码,最后销毁的案例全过程,也是实现一个简单的在线编程后端API的完整流程,你可以在此基础上添加额外的辅助功能,比如上传文件、编辑文件、查阅文件、自定义安装等功能。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
在线编程实现!如何在Java后端通过DockerClient操作Docker生成python环境
课时6:Java编程起步
课时6:Java编程起步,主讲人李兴华。课程摘要:介绍Java编程的第一个程序“Hello World”,讲解如何使用记事本或EditPlus编写、保存和编译Java源代码(*.java文件),并解释类定义、主方法(public static void main)及屏幕打印(System.out.println)。强调类名与文件名一致的重要性,以及Java程序的编译和执行过程。通过实例演示,帮助初学者掌握Java编程的基本步骤和常见问题。
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
132 5
Java 并发编程——volatile 关键字解析
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
128 12
|
4月前
|
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
360 2
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等