【Java并发】【AQS】适合初学者体质的AQS入门

简介: AQS这是灰常重要的哈,很多JUC下的框架的核心,那都是我们的AQS,所以这里,我们直接开始先研究AQS。 那说到研究AQS,那我们应该,使用开始说起🤓 入门 什么是AQS? AQS(Abst

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD

🔥 2025本人正在沉淀中... 博客更新速度++

👍 欢迎点赞、收藏、关注,跟上我的更新节奏

📚欢迎订阅专栏,专栏名《在2B工作中寻求并发是否搞错了什么》

前言

AQS这是灰常重要的哈,很多JUC下的框架的核心,那都是我们的AQS,所以这里,我们直接开始先研究AQS。

那说到研究AQS,那我们应该,使用开始说起🤓

入门

什么是AQS?

AQS(AbstractQueuedSynchronizer)是 Java 并发编程中一个非常重要的基础框架,它是 Java 并发包(java.util.concurrent)中许多同步工具(如 ReentrantLockSemaphoreCountDownLatch 等)的核心实现。

AQS 通过 共享资源状态(state)线程等待队列(CLH 队列) 实现同步机制,核心逻辑是:

  1. 原子性管理状态(state) :通过 volatile int state 表示资源状态(例如锁是否被持有、信号量剩余数量)。不同场景下的含义:

    • ReentrantLockstate=0 表示未加锁,state>0 表示锁被重入的次数。
    • Semaphorestate 表示剩余许可证数量。
    • CountDownLatchstate 表示剩余未完成的计数。
  2. 线程排队与唤醒:未获取资源的线程进入队列等待,资源释放时按策略唤醒队列中的线程。

    • 一个双向链表实现的 FIFO 队列,存储等待资源的线程。
    • 每个节点(Node)封装一个线程及其等待状态(如是否被取消)。

AQS 的两种模式

1. 独占模式(Exclusive)

同一时刻只有一个线程能获取资源,如 ReentrantLock。.

boolean tryAcquire(int arg)   // 尝试获取资源(需子类实现)
boolean tryRelease(int arg)   // 尝试释放资源(需子类实现)
AI 代码解读

2. 共享模式(Shared)

多个线程可以同时获取资源,如 SemaphoreCountDownLatch

int tryAcquireShared(int arg) // 尝试获取共享资源(返回剩余可用数量)
boolean tryReleaseShared(int arg) // 尝试释放共享资源
AI 代码解读

为什么要用AQS?

因为方便!!

如果没有AQS,我们将要自己解决以下问题:

  1. 线程的阻塞和唤醒:你需要自己实现一个队列来管理等待的线程,并确保线程可以公平地获取资源。
  2. 状态管理:你需要自己管理同步状态,并确保状态的操作是线程安全的。
  3. 避免竞争条件:你需要使用 CAS 操作或其他同步机制来避免竞争条件。
  4. 性能优化:你需要优化线程的阻塞和唤醒机制,以减少上下文切换的开销。

使用AQS

继承AbstractQueuedSynchronizer类,重写方法,这里有没有uu感受到模版方法设计模式呢?

不了解模板方法的uu,可以看看这篇:【设计模式】【行为型模式】模板方法模式(Template Method)-CSDN博客

独占模式

  • acquire(int arg):获取资源的模板方法。tryAcquire(arg):由子类重写(如 ReentrantLock 定义如何获取锁)。
  • release(int arg):释放资源的模板方法。tryRelease(arg):由子类重写(如 ReentrantLock 定义如何释放锁)。
// 需要我们重写获取资源 
protected boolean tryAcquire(int arg) {
   
    throw new UnsupportedOperationException();
}

// 占有资源的模版方法
public final void acquire(int arg) {
   
    if (!tryAcquire(arg) &&      // 子类实现:尝试获取资源
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // AQS 实现:排队等待
        selfInterrupt();
}

// 需要我们重写释放资源
protected boolean tryRelease(int arg) {
   
    throw new UnsupportedOperationException();
}

// 释放资源的模板方法
public final boolean release(int arg) {
   
    if (tryRelease(arg)) {
          // 子类实现:尝试释放资源
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);   // AQS 实现:唤醒后续线程
        return true;
    }
    return false;
}
AI 代码解读

共享模式

  • acquireShared(int arg)releaseShared(int arg)
    模板方法逻辑类似,调用子类实现的 tryAcquireShared()tryReleaseShared()
// 共享模式获取资源的入口方法(模板方法)
public final void acquireShared(int arg) {
   
    // 调用子类实现的 tryAcquireShared 方法
    if (tryAcquireShared(arg) < 0)
        // AQS 实现的共享模式排队逻辑
        doAcquireShared(arg);
}

// 共享模式释放资源的入口方法(模板方法)
public final boolean releaseShared(int arg) {
   
    // 调用子类实现的 tryReleaseShared 方法
    if (tryReleaseShared(arg)) {
   
        // AQS 实现的共享模式释放逻辑
        doReleaseShared();
        return true;
    }
    return false;
}
AI 代码解读

上面介绍的非常重要,不然会有略微影响,看下面的内容。

下面会使用到AQS的独占模式共享模式, 来简单实现JUC框架的一些功能,跟上主播的节奏😘

实现一个简单的独占锁

独占锁:用于确保同一时刻只有一个线程可以获取资源。

继承 AbstractQueuedSynchronizer 并重写 tryAcquire/tryRelease 方法。

public class SimpleLock {
   
    private static class Sync extends AbstractQueuedSynchronizer {
   
        @Override
        protected boolean tryAcquire(int arg) {
   
            return compareAndSetState(0, 1); // 尝试获取锁
        }

        @Override
        protected boolean tryRelease(int arg) {
   
            setState(0); // 释放锁
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
   
            return getState() == 1; // 判断锁是否被持有
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
   
        sync.acquire(1); // 获取锁
    }

    public void unlock() {
   
        sync.release(1); // 释放锁
    }

    public boolean isLocked() {
   
        return sync.isHeldExclusively(); // 检查锁状态
    }
}
AI 代码解读

使用

public class SimpleLockExample {
   
    private static final SimpleLock lock = new SimpleLock();

    public static void main(String[] args) {
   
        Runnable task = () -> {
   
            lock.lock();
            try {
   
                System.out.println(Thread.currentThread().getName() + " acquired the lock");
                Thread.sleep(1000); // 模拟操作
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " released the lock");
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();
    }
}
AI 代码解读

实现一个信号量(Semaphore)

信号量用于控制同时访问某个资源的线程数量。

public class SimpleSemaphore {
   
    private static class Sync extends AbstractQueuedSynchronizer {
   
        Sync(int permits) {
   
            setState(permits); // 初始化信号量许可数
        }

        @Override
        protected int tryAcquireShared(int acquires) {
   
            for (;;) {
   
                int available = getState(); // 当前可用的许可数
                int remaining = available - acquires; // 获取后的剩余许可数
                if (remaining < 0 || compareAndSetState(available, remaining)) {
   
                    return remaining; // 返回剩余许可数
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
   
            for (;;) {
   
                int current = getState(); // 当前可用的许可数
                int next = current + releases; // 释放后的许可数
                if (compareAndSetState(current, next)) {
   
                    return true; // 释放成功
                }
            }
        }
    }

    private final Sync sync;

    public SimpleSemaphore(int permits) {
   
        if (permits < 0) {
   
            throw new IllegalArgumentException("Permits must be non-negative");
        }
        this.sync = new Sync(permits);
    }

    public void acquire() {
   
        sync.acquireShared(1); // 获取一个许可
    }

    public void release() {
   
        sync.releaseShared(1); // 释放一个许可
    }
}
AI 代码解读

使用

public class SemaphoreExample {
   
    private static final SimpleSemaphore semaphore = new SimpleSemaphore(2); // 允许2个线程同时访问

    public static void main(String[] args) {
   
        Runnable task = () -> {
   
            semaphore.acquire();
            try {
   
                System.out.println(Thread.currentThread().getName() + " acquired the permit");
                Thread.sleep(1000); // 模拟操作
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " released the permit");
            }
        };

        for (int i = 0; i < 4; i++) {
   
            new Thread(task).start();
        }
    }
}
AI 代码解读

实现一个倒计时门闩(CountDownLatch)

倒计时门闩用于让一个或多个线程等待其他线程完成操作。

public class SimpleCountDownLatch {
   
    private static class Sync extends AbstractQueuedSynchronizer {
   
        Sync(int count) {
   
            setState(count); // 初始化计数器
        }

        @Override
        protected int tryAcquireShared(int acquires) {
   
            return getState() == 0 ? 1 : -1; // 如果计数器为0,返回1(成功),否则返回-1(失败)
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
   
            for (;;) {
   
                int current = getState();
                if (current == 0) {
   
                    return false; // 计数器已经为0,无法再减少
                }
                int next = current - 1;
                if (compareAndSetState(current, next)) {
   
                    return next == 0; // 返回是否计数器减到0
                }
            }
        }
    }

    private final Sync sync;

    public SimpleCountDownLatch(int count) {
   
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
   
        sync.acquireSharedInterruptibly(1); // 等待计数器减到0
    }

    public void countDown() {
   
        sync.releaseShared(1); // 计数器减1
    }
}
AI 代码解读

我们重写的tryAcquireShared被调用的位置。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
   
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


public final boolean releaseShared(int arg) {
   
    if (tryReleaseShared(arg)) {
   
        doReleaseShared();
        return true;
    }
    return false;
}
AI 代码解读

使用

public class CountDownLatchExample {
    private static final SimpleCountDownLatch latch = new SimpleCountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName() + " is running");
            latch.countDown(); // 计数器减1
        };

        for (int i = 0; i < 3; i++) {
            new Thread(task).start();
        }

        latch.await(); // 等待计数器减到0
        System.out.println("All tasks are done!");
    }
}
AI 代码解读

实现条件变量(Condition)

  • 锁与条件的绑定

    • 条件变量 Condition 必须与锁(AQS)绑定,因为 await()signal() 必须在持有锁的情况下调用。
  • 线程状态管理

    • AQS 内部通过 ConditionObject 管理条件队列,通过 acquirerelease 管理同步队列。
  • 条件队列与同步队列的交互
    • 当线程调用 await() 时,它会被放入条件队列并释放锁。
    • 当其他线程调用 signal() 时,条件队列中的线程会被移动到同步队列,等待重新获取锁。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

public class SimpleLockWithCondition {
   
    // 继承 AQS 并实现一个简单的独占锁
    private static class Sync extends AbstractQueuedSynchronizer {
   
        @Override
        protected boolean tryAcquire(int arg) {
   
            return compareAndSetState(0, 1); // 尝试获取锁
        }

        @Override
        protected boolean tryRelease(int arg) {
   
            setState(0); // 释放锁
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
   
            return getState() == 1; // 锁是否被持有
        }

        // 直接使用 AQS 内置的 ConditionObject
        public Condition newCondition() {
   
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
   
        sync.acquire(1); // 获取锁
    }

    public void unlock() {
   
        sync.release(1); // 释放锁
    }

    public Condition newCondition() {
   
        return sync.newCondition(); // 返回 AQS 的条件队列
    }
}
AI 代码解读

测试使用

public class ConditionExample {
   
    private static final SimpleLockWithCondition lock = new SimpleLockWithCondition();
    private static final Condition condition = lock.newCondition();
    private static boolean flag = false;

    public static void main(String[] args) {
   
        // 线程1:等待条件满足
        Thread waiter = new Thread(() -> {
   
            lock.lock();
            try {
   
                System.out.println("Thread-1: 等待条件满足...");
                while (!flag) {
   
                    condition.await(); // 释放锁并等待(AQS 内部管理条件队列)
                }
                System.out.println("Thread-1: 条件已满足!");
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                lock.unlock();
            }
        });

        // 线程2:修改条件并唤醒等待线程
        Thread signaler = new Thread(() -> {
   
            lock.lock();
            try {
   
                Thread.sleep(1000); // 模拟耗时操作
                flag = true;
                System.out.println("Thread-2: 条件已修改,唤醒等待线程");
                condition.signalAll(); // 唤醒所有等待线程(AQS 内部将线程从条件队列移到同步队列)
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                lock.unlock();
            }
        });

        waiter.start();
        signaler.start();
    }
}
AI 代码解读

后话

怎么样,是不是对AQS有点略微的印象了呢?知道它能怎么用的就行了。

什么就结束了?不对的。根据主播以往的规律,下一篇就是原理or源码了。

点上关注,主播马上回来!!!

flzjkl
+关注
目录
打赏
0
8
8
0
82
分享
相关文章
|
23天前
|
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
154 60
【Java并发】【线程池】带你从0-1入门线程池
【原理】【Java并发】【synchronized】适合中学者体质的synchronized原理
本文深入解析了Java中`synchronized`关键字的底层原理,从代码块与方法修饰的区别到锁升级机制,内容详尽。通过`monitorenter`和`monitorexit`指令,阐述了`synchronized`实现原子性、有序性和可见性的原理。同时,详细分析了锁升级流程:无锁 → 偏向锁 → 轻量级锁 → 重量级锁,结合对象头`MarkWord`的变化,揭示JVM优化锁性能的策略。此外,还探讨了Monitor的内部结构及线程竞争锁的过程,并介绍了锁消除与锁粗化等优化手段。最后,结合实际案例,帮助读者全面理解`synchronized`在并发编程中的作用与细节。
34 8
【原理】【Java并发】【synchronized】适合中学者体质的synchronized原理
|
11天前
|
【Java并发】【synchronized】适合初学者体质入门的synchronized
欢迎来到我的Java线程同步入门指南!我不是外包员工,梦想是写高端CRUD。2025年我正在沉淀中,博客更新速度加快,欢迎点赞、收藏、关注。 本文介绍Java中的`synchronized`关键字,适合初学者。`synchronized`用于确保多个线程访问共享资源时不会发生冲突,避免竞态条件、保证内存可见性、防止原子性破坏及协调多线程有序访问。
47 8
【Java并发】【synchronized】适合初学者体质入门的synchronized
|
17天前
|
【Java并发】【volatile】适合初学者体质的volatile
当你阅读dalao的框架源码的时候,你是否会见到这样一个关键字 - - - volatie,诶,你是否会好奇,为什么要加它?加了它有什么作用?
56 14
【Java并发】【volatile】适合初学者体质的volatile
|
15天前
|
【原理】【Java并发】【volatile】适合初学者体质的volatile原理
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是写出高端的CRUD应用。2025年,我正在沉淀自己,博客更新速度也在加快。在这里,我会分享关于Java并发编程的深入理解,尤其是volatile关键字的底层原理。 本文将带你深入了解Java内存模型(JMM),解释volatile如何通过内存屏障和缓存一致性协议确保可见性和有序性,同时探讨其局限性及优化方案。欢迎订阅专栏《在2B工作中寻求并发是否搞错了什么》,一起探索并发编程的奥秘! 关注我,点赞、收藏、评论,跟上更新节奏,让我们共同进步!
86 8
【原理】【Java并发】【volatile】适合初学者体质的volatile原理
|
11天前
|
《从头开始学java,一天一个知识点》之:数组入门:一维数组的定义与遍历
**你是否也经历过这些崩溃瞬间?** - 看了三天教程,连`i++`和`++i`的区别都说不清 - 面试时被追问&quot;`a==b`和`equals()`的区别&quot;,大脑突然空白 - 写出的代码总是莫名报NPE,却不知道问题出在哪个运算符 这个系列就是为你打造的Java「速效救心丸」!我们承诺:每天1分钟,地铁通勤、午休间隙即可完成学习;直击痛点,只讲高频考点和实际开发中的「坑位」;拒绝臃肿,没有冗长概念堆砌,每篇都有可运行的代码标本。明日预告:《多维数组与常见操作》。 通过实例讲解数组的核心认知、趣味场景应用、企业级开发规范及优化技巧,帮助你快速掌握Java数组的精髓。
57 23
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
120 60
Java AQS 实现——Condition
本文着重介绍 AQS 的 Condition 实现方式。
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
64 23
|
19天前
|
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
90 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码