【Java并发】【AQS】适合初学者体质的AQS入门

简介: AQS这是灰常重要的哈,很多JUC下的框架的核心,那都是我们的AQS,所以这里,我们直接开始先研究AQS。 那说到研究AQS,那我们应该,使用开始说起🤓 入门 什么是AQS? AQS(Abst

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD

🔥 2025本人正在沉淀中... 博客更新速度++

👍 欢迎点赞、收藏、关注,跟上我的更新节奏

📚欢迎订阅专栏,专栏名《在2B工作中寻求并发是否搞错了什么》

前言

AQS这是灰常重要的哈,很多JUC下的框架的核心,那都是我们的AQS,所以这里,我们直接开始先研究AQS。

那说到研究AQS,那我们应该,使用开始说起🤓

入门

什么是AQS?

AQS(AbstractQueuedSynchronizer)是 Java 并发编程中一个非常重要的基础框架,它是 Java 并发包(java.util.concurrent)中许多同步工具(如 ReentrantLockSemaphoreCountDownLatch 等)的核心实现。

AQS 通过 共享资源状态(state)线程等待队列(CLH 队列) 实现同步机制,核心逻辑是:

  1. 原子性管理状态(state) :通过 volatile int state 表示资源状态(例如锁是否被持有、信号量剩余数量)。不同场景下的含义:

    • ReentrantLockstate=0 表示未加锁,state>0 表示锁被重入的次数。
    • Semaphorestate 表示剩余许可证数量。
    • CountDownLatchstate 表示剩余未完成的计数。
  2. 线程排队与唤醒:未获取资源的线程进入队列等待,资源释放时按策略唤醒队列中的线程。

    • 一个双向链表实现的 FIFO 队列,存储等待资源的线程。
    • 每个节点(Node)封装一个线程及其等待状态(如是否被取消)。

AQS 的两种模式

1. 独占模式(Exclusive)

同一时刻只有一个线程能获取资源,如 ReentrantLock。.

boolean tryAcquire(int arg)   // 尝试获取资源(需子类实现)
boolean tryRelease(int arg)   // 尝试释放资源(需子类实现)

2. 共享模式(Shared)

多个线程可以同时获取资源,如 SemaphoreCountDownLatch

int tryAcquireShared(int arg) // 尝试获取共享资源(返回剩余可用数量)
boolean tryReleaseShared(int arg) // 尝试释放共享资源

为什么要用AQS?

因为方便!!

如果没有AQS,我们将要自己解决以下问题:

  1. 线程的阻塞和唤醒:你需要自己实现一个队列来管理等待的线程,并确保线程可以公平地获取资源。
  2. 状态管理:你需要自己管理同步状态,并确保状态的操作是线程安全的。
  3. 避免竞争条件:你需要使用 CAS 操作或其他同步机制来避免竞争条件。
  4. 性能优化:你需要优化线程的阻塞和唤醒机制,以减少上下文切换的开销。

使用AQS

继承AbstractQueuedSynchronizer类,重写方法,这里有没有uu感受到模版方法设计模式呢?

不了解模板方法的uu,可以看看这篇:【设计模式】【行为型模式】模板方法模式(Template Method)-CSDN博客

独占模式

  • acquire(int arg):获取资源的模板方法。tryAcquire(arg):由子类重写(如 ReentrantLock 定义如何获取锁)。
  • release(int arg):释放资源的模板方法。tryRelease(arg):由子类重写(如 ReentrantLock 定义如何释放锁)。
// 需要我们重写获取资源 
protected boolean tryAcquire(int arg) {
   
    throw new UnsupportedOperationException();
}

// 占有资源的模版方法
public final void acquire(int arg) {
   
    if (!tryAcquire(arg) &&      // 子类实现:尝试获取资源
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // AQS 实现:排队等待
        selfInterrupt();
}

// 需要我们重写释放资源
protected boolean tryRelease(int arg) {
   
    throw new UnsupportedOperationException();
}

// 释放资源的模板方法
public final boolean release(int arg) {
   
    if (tryRelease(arg)) {
          // 子类实现:尝试释放资源
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);   // AQS 实现:唤醒后续线程
        return true;
    }
    return false;
}

共享模式

  • acquireShared(int arg)releaseShared(int arg)
    模板方法逻辑类似,调用子类实现的 tryAcquireShared()tryReleaseShared()
// 共享模式获取资源的入口方法(模板方法)
public final void acquireShared(int arg) {
   
    // 调用子类实现的 tryAcquireShared 方法
    if (tryAcquireShared(arg) < 0)
        // AQS 实现的共享模式排队逻辑
        doAcquireShared(arg);
}

// 共享模式释放资源的入口方法(模板方法)
public final boolean releaseShared(int arg) {
   
    // 调用子类实现的 tryReleaseShared 方法
    if (tryReleaseShared(arg)) {
   
        // AQS 实现的共享模式释放逻辑
        doReleaseShared();
        return true;
    }
    return false;
}

上面介绍的非常重要,不然会有略微影响,看下面的内容。

下面会使用到AQS的独占模式共享模式, 来简单实现JUC框架的一些功能,跟上主播的节奏😘

实现一个简单的独占锁

独占锁:用于确保同一时刻只有一个线程可以获取资源。

继承 AbstractQueuedSynchronizer 并重写 tryAcquire/tryRelease 方法。

public class SimpleLock {
   
    private static class Sync extends AbstractQueuedSynchronizer {
   
        @Override
        protected boolean tryAcquire(int arg) {
   
            return compareAndSetState(0, 1); // 尝试获取锁
        }

        @Override
        protected boolean tryRelease(int arg) {
   
            setState(0); // 释放锁
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
   
            return getState() == 1; // 判断锁是否被持有
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
   
        sync.acquire(1); // 获取锁
    }

    public void unlock() {
   
        sync.release(1); // 释放锁
    }

    public boolean isLocked() {
   
        return sync.isHeldExclusively(); // 检查锁状态
    }
}

使用

public class SimpleLockExample {
   
    private static final SimpleLock lock = new SimpleLock();

    public static void main(String[] args) {
   
        Runnable task = () -> {
   
            lock.lock();
            try {
   
                System.out.println(Thread.currentThread().getName() + " acquired the lock");
                Thread.sleep(1000); // 模拟操作
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " released the lock");
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();
    }
}

实现一个信号量(Semaphore)

信号量用于控制同时访问某个资源的线程数量。

public class SimpleSemaphore {
   
    private static class Sync extends AbstractQueuedSynchronizer {
   
        Sync(int permits) {
   
            setState(permits); // 初始化信号量许可数
        }

        @Override
        protected int tryAcquireShared(int acquires) {
   
            for (;;) {
   
                int available = getState(); // 当前可用的许可数
                int remaining = available - acquires; // 获取后的剩余许可数
                if (remaining < 0 || compareAndSetState(available, remaining)) {
   
                    return remaining; // 返回剩余许可数
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
   
            for (;;) {
   
                int current = getState(); // 当前可用的许可数
                int next = current + releases; // 释放后的许可数
                if (compareAndSetState(current, next)) {
   
                    return true; // 释放成功
                }
            }
        }
    }

    private final Sync sync;

    public SimpleSemaphore(int permits) {
   
        if (permits < 0) {
   
            throw new IllegalArgumentException("Permits must be non-negative");
        }
        this.sync = new Sync(permits);
    }

    public void acquire() {
   
        sync.acquireShared(1); // 获取一个许可
    }

    public void release() {
   
        sync.releaseShared(1); // 释放一个许可
    }
}

使用

public class SemaphoreExample {
   
    private static final SimpleSemaphore semaphore = new SimpleSemaphore(2); // 允许2个线程同时访问

    public static void main(String[] args) {
   
        Runnable task = () -> {
   
            semaphore.acquire();
            try {
   
                System.out.println(Thread.currentThread().getName() + " acquired the permit");
                Thread.sleep(1000); // 模拟操作
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " released the permit");
            }
        };

        for (int i = 0; i < 4; i++) {
   
            new Thread(task).start();
        }
    }
}

实现一个倒计时门闩(CountDownLatch)

倒计时门闩用于让一个或多个线程等待其他线程完成操作。

public class SimpleCountDownLatch {
   
    private static class Sync extends AbstractQueuedSynchronizer {
   
        Sync(int count) {
   
            setState(count); // 初始化计数器
        }

        @Override
        protected int tryAcquireShared(int acquires) {
   
            return getState() == 0 ? 1 : -1; // 如果计数器为0,返回1(成功),否则返回-1(失败)
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
   
            for (;;) {
   
                int current = getState();
                if (current == 0) {
   
                    return false; // 计数器已经为0,无法再减少
                }
                int next = current - 1;
                if (compareAndSetState(current, next)) {
   
                    return next == 0; // 返回是否计数器减到0
                }
            }
        }
    }

    private final Sync sync;

    public SimpleCountDownLatch(int count) {
   
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
   
        sync.acquireSharedInterruptibly(1); // 等待计数器减到0
    }

    public void countDown() {
   
        sync.releaseShared(1); // 计数器减1
    }
}

我们重写的tryAcquireShared被调用的位置。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
   
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


public final boolean releaseShared(int arg) {
   
    if (tryReleaseShared(arg)) {
   
        doReleaseShared();
        return true;
    }
    return false;
}

使用

public class CountDownLatchExample {
    private static final SimpleCountDownLatch latch = new SimpleCountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName() + " is running");
            latch.countDown(); // 计数器减1
        };

        for (int i = 0; i < 3; i++) {
            new Thread(task).start();
        }

        latch.await(); // 等待计数器减到0
        System.out.println("All tasks are done!");
    }
}

实现条件变量(Condition)

  • 锁与条件的绑定

    • 条件变量 Condition 必须与锁(AQS)绑定,因为 await()signal() 必须在持有锁的情况下调用。
  • 线程状态管理

    • AQS 内部通过 ConditionObject 管理条件队列,通过 acquirerelease 管理同步队列。
  • 条件队列与同步队列的交互
    • 当线程调用 await() 时,它会被放入条件队列并释放锁。
    • 当其他线程调用 signal() 时,条件队列中的线程会被移动到同步队列,等待重新获取锁。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

public class SimpleLockWithCondition {
   
    // 继承 AQS 并实现一个简单的独占锁
    private static class Sync extends AbstractQueuedSynchronizer {
   
        @Override
        protected boolean tryAcquire(int arg) {
   
            return compareAndSetState(0, 1); // 尝试获取锁
        }

        @Override
        protected boolean tryRelease(int arg) {
   
            setState(0); // 释放锁
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
   
            return getState() == 1; // 锁是否被持有
        }

        // 直接使用 AQS 内置的 ConditionObject
        public Condition newCondition() {
   
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
   
        sync.acquire(1); // 获取锁
    }

    public void unlock() {
   
        sync.release(1); // 释放锁
    }

    public Condition newCondition() {
   
        return sync.newCondition(); // 返回 AQS 的条件队列
    }
}

测试使用

public class ConditionExample {
   
    private static final SimpleLockWithCondition lock = new SimpleLockWithCondition();
    private static final Condition condition = lock.newCondition();
    private static boolean flag = false;

    public static void main(String[] args) {
   
        // 线程1:等待条件满足
        Thread waiter = new Thread(() -> {
   
            lock.lock();
            try {
   
                System.out.println("Thread-1: 等待条件满足...");
                while (!flag) {
   
                    condition.await(); // 释放锁并等待(AQS 内部管理条件队列)
                }
                System.out.println("Thread-1: 条件已满足!");
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                lock.unlock();
            }
        });

        // 线程2:修改条件并唤醒等待线程
        Thread signaler = new Thread(() -> {
   
            lock.lock();
            try {
   
                Thread.sleep(1000); // 模拟耗时操作
                flag = true;
                System.out.println("Thread-2: 条件已修改,唤醒等待线程");
                condition.signalAll(); // 唤醒所有等待线程(AQS 内部将线程从条件队列移到同步队列)
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                lock.unlock();
            }
        });

        waiter.start();
        signaler.start();
    }
}

后话

怎么样,是不是对AQS有点略微的印象了呢?知道它能怎么用的就行了。

什么就结束了?不对的。根据主播以往的规律,下一篇就是原理or源码了。

点上关注,主播马上回来!!!

目录
相关文章
|
8天前
|
存储 安全 Java
【Java并发】【原子类】适合初学体质的原子类入门
什么是CAS? 说到原子类,首先就要说到CAS: CAS(Compare and Swap) 是一种无锁的原子操作,用于实现多线程环境下的安全数据更新。 CAS(Compare and Swap) 的
42 15
【Java并发】【原子类】适合初学体质的原子类入门
|
5天前
|
缓存 安全 Java
【Java并发】【ConcurrentHashMap】适合初学体质的ConcurrentHashMap入门
ConcurrentHashMap是Java中线程安全的哈希表实现,支持高并发读写操作。相比Hashtable,它通过分段锁(JDK1.7)或CAS+synchronized(JDK1.8)实现更细粒度锁控制,提升性能与安全性。本文详细介绍其构造方法、添加/获取/删除元素等常用操作,并对比JDK1.7和1.8的区别,帮助开发者深入理解与使用ConcurrentHashMap。欢迎关注,了解更多!
35 3
【Java并发】【ConcurrentHashMap】适合初学体质的ConcurrentHashMap入门
|
9天前
|
Java
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门
前言 有了前文对简单实用的学习 【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门 聪明的你,一定会想知道更多。哈哈哈哈哈,下面主播就...
40 6
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门
|
10天前
|
安全 Java
【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门
什么是ArrayBlockingQueue ArrayBlockingQueue是 Java 并发编程中一个基于数组实现的有界阻塞队列,属于 java.util.concurrent 包,实现了 Bl...
44 6
【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门
|
10天前
|
安全 Java
【源码】【Java并发】【ArrayBlockingQueue】适合中学者体质的ArrayBlockingQueue
前言 通过之前的学习是不是学的不过瘾,没关系,马上和主播来挑战源码的阅读 【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门 还有一件事
40 5
【源码】【Java并发】【ArrayBlockingQueue】适合中学者体质的ArrayBlockingQueue
|
9天前
|
安全 Java
【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门
前言 你是否在线程池工具类里看到过它的身影? 你是否会好奇LinkedBlockingQueue是啥呢? 没有关系,小手手点上关注,跟上主播的节奏。 什么是LinkedBlockingQueue? ...
36 1
【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门
|
11月前
|
安全 Java
从零开始学习 Java:简单易懂的入门指南之不可变集合、方法引用(二十六)
从零开始学习 Java:简单易懂的入门指南之不可变集合、方法引用(二十六)
|
10月前
|
存储 Java API
Java——Stream流(1/2):Stream流入门、Stream流的创建(认识Stream、体验Stream流、Stream流的使用步骤、获取Stream流的方法)
Java——Stream流(1/2):Stream流入门、Stream流的创建(认识Stream、体验Stream流、Stream流的使用步骤、获取Stream流的方法)
178 0
|
Java 索引
从零开始学习 Java:简单易懂的入门指南之方法(六)
方法的概念:方法(method)是程序中最小的执行单元注意:方法必须先创建才可以使用,该过程成为方法定义,方法创建后并不是直接可以运行的,需要手动使用后,才执行,该过程成为方法调用
从零开始学习 Java:简单易懂的入门指南之方法(六)

热门文章

最新文章