一天一个 JUC 工具类 -- AQS

简介: AbstractQueuedSynchronizer(AQS)是Java中用于构建锁和同步器的抽象基类。它是Java并发工具包(java.util.concurrent)中实现高级线程同步控制的关键组件之一。AQS提供了一种基于等待队列的同步器框架,允许开发者构建自定义的同步器。在这篇文章中我们将从源码分析和底层原理的角度来介绍AQS。

AQS

AbstractQueuedSynchronizer(AQS)是Java中用于构建锁和同步器的抽象基类。它是Java并发工具包(java.util.concurrent)中实现高级线程同步控制的关键组件之一。AQS提供了一种基于等待队列的同步器框架,允许开发者构建自定义的同步器。在这篇文章中我们将从源码分析和底层原理的角度来介绍AQS。

源码分析

       private transient volatile Node head;

   private transient volatile Node tail;
   static final class Node {
    

       static final Node SHARED = new Node();

       static final Node EXCLUSIVE = null;


       static final int CANCELLED =  1;

       static final int SIGNAL    = -1;

       static final int CONDITION = -2;

       static final int PROPAGATE = -3;


       volatile int waitStatus;

       volatile Node prev;

       volatile Node next;

       volatile Thread thread;

       Node nextWaiter;

       /**
        * Returns true if node is waiting in shared mode.
        */
       final boolean isShared() {
    
           return nextWaiter == SHARED;
       }

       final Node predecessor() throws NullPointerException {
    
           Node p = prev;
           if (p == null)
               throw new NullPointerException();
           else
               return p;
       }

       Node() {
        // Used to establish initial head or SHARED marker
       }

       Node(Thread thread, Node mode) {
         // Used by addWaiter
           this.nextWaiter = mode;
           this.thread = thread;
       }

       Node(Thread thread, int waitStatus) {
     // Used by Condition
           this.waitStatus = waitStatus;
           this.thread = thread;
       }
   }

AQS的核心数据结构是一个双向链表(变量 head 和 tail),称为等待队列 。我们来看一下AQS 内部定义的 Node 节点里面都有哪些东西

常量 SHAREDEXCLUSIVE 是区分等待队列中的两种类型:独占节点(exclusive node)和共享节点(shared node)。独占节点用于独占式同步,共享节点用于共享式同步(例如Semaphore、CountDownLatch等)。接下来是几个状态量来表明当前 node 的状态:


               static final int CANCELLED =  1;

       static final int SIGNAL    = -1;

       static final int CONDITION = -2;

       static final int PROPAGATE = -3;

               volatile int waitStatus;

这几个量都是 waitStatus 可能的值

  1. SIGNAL:当前节点的后继节点被阻塞(通过 park),因此当前节点在释放或取消时必须唤醒它的后继节点。为避免竞争,获取操作必须首先指示它们需要一个信号,然后重试原子获取操作,在失败时进行阻塞。
  2. CANCELLED:该节点由于超时或中断而被取消。节点一旦处于该状态,将永远不会再次阻塞。特别地,拥有被取消节点的线程将不再阻塞。
  3. CONDITION:该节点当前位于条件队列中。在被传输之前,它不会被用作同步队列节点。一旦被传输,该节点的状态将被设置为 0。(在这里使用该值与字段的其他用途无关,但简化了机制。)
  4. PROPAGATE:应该将 releaseShared 操作传播给其他节点。在 doReleaseShared 方法中,对于头节点,设置此值以确保传播继续,即使此后有其他操作介入。
  5. 0:上述情况均不满足。对于非负数值,表示节点无需发出信号。因此,大多数代码无需检查特定的值,只需检查 SIGNAL 即可。该字段对于普通同步节点初始化为 0,对于条件节点初始化为 CONDITION。可以使用 CAS(或在可能时,无条件的 volatile 写入)来修改该字段的值。

对于独占节点(exclusive node)和共享节点(shared node)的解释:

独占节点(exclusive node)和共享节点(shared node)。这两种节点分别用于实现独占式同步和共享式同步。在多线程环境下,AQS利用这些节点实现对锁和资源的安全管理和控制。

独占节点(exclusive node):

独占节点用于独占式同步,如ReentrantLock等可重入锁。在独占模式下,同一时刻只允许一个线程获取锁,其他线程需要等待,直到获取锁的线程释放锁。独占节点继承自AQS的内部类Node。

共享节点(shared node):

共享节点用于共享式同步,如CountDownLatch、Semaphore等。在共享模式下,允许多个线程同时获取资源,一般用于计数器等场景。共享节点同样继承自AQS的内部类Node。


acquire()release() 是 AQS 中的两个核心方法,用于实现同步器的获取和释放操作。它们是用来实现独占式同步的关键方法。

1. acquire() 方法:

public final void acquire(int arg) {
   if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
       selfInterrupt();
   }
}
  • arg:表示获取同步状态的参数,具体含义由具体的同步器实现类决定。

acquire() 方法分为两个步骤:tryAcquire()acquireQueued()

  • tryAcquire():尝试以独占模式获取同步状态。该方法由具体的同步器实现类重写,用于尝试获取同步状态。如果成功获取了同步状态(即返回 true),则该方法直接返回。如果没有成功获取同步状态(即返回 false),则继续执行后续步骤。
  • acquireQueued():将当前线程加入等待队列,并以独占模式获取同步状态。该方法会在等待队列中排队,并尝试获取同步状态,直到成功获取为止。它是一个自旋操作,即不断尝试获取锁直到成功。在等待队列中排队的过程中,如果线程被中断,则会退出自旋,并清除中断状态。
  • addWaiter(Node mode):将当前线程以指定模式(独占模式或共享模式)加入等待队列。该方法会创建一个新的节点,并将其插入到等待队列的尾部。
  • selfInterrupt():如果当前线程被中断,则自我中断。在等待队列中自旋的过程中,如果发现当前线程被中断,则会调用该方法中断自己,以响应外部的中断请求。

2. release() 方法:

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0) {
           unparkSuccessor(h);
       }
       return true;
   }
   return false;
}
  • arg:表示释放同步状态的参数,具体含义由具体的同步器实现类决定。

  • release() 方法分为两个步骤:tryRelease()unparkSuccessor()

  • tryRelease():尝试以独占模式释放同步状态。该方法由具体的同步器实现类重写,用于尝试释放同步状态。如果成功释放了同步状态(即返回 true),则继续执行后续步骤。如果没有成功释放同步状态(即返回 false),则不进行后续操作。

  • unparkSuccessor(Node node):唤醒后继节点。当线程释放锁时,它会调用 unparkSuccessor() 方法来唤醒等待队列中的后继节点,使其有机会继续尝试获取同步状态。该方法会将后继节点的等待状态设为 0(Node.SIGNAL)并尝试唤醒后继节点。

release() 方法通常在释放锁或资源时调用,它会首先尝试释放同步状态,如果成功释放,则唤醒等待队列中的后继节点,使其有机会获取同步状态。

这些方法的实现会依赖具体的同步器类型,因为不同的同步器有不同的获取和释放逻辑。在使用 AQS 时,开发者需要根据具体的业务需求来实现这些方法,以实现线程的同步和协作。

自己手写一个共享锁

基于 AQS 实现一个共享式的同步锁,我们创建一个新的类 SharedLock。SharedLock 继承自 AQS 并实现tryAcquireShared()、tryReleaseShared() 方法

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class SharedLock extends AbstractQueuedSynchronizer {
    
   // 构造方法,指定共享资源数量
   public SharedLock(int resources) {
    
       setState(resources);
   }

   // 获取共享资源
   public void acquireShared() {
    
       acquireShared(1);
   }

   // 释放共享资源
   public void releaseShared() {
    
       releaseShared(1);
   }

   @Override
   protected int tryAcquireShared(int arg) {
    
       // 获取当前同步状态
       int currentState = getState();

       // 如果当前资源数量为0,或者请求资源数量大于当前资源数量,则返回负值,表示获取失败
       if (currentState == 0 || arg > currentState) {
    
           return -1;
       }

       // 尝试更新同步状态,获取资源
       int newState = currentState - arg;
       if (compareAndSetState(currentState, newState)) {
    
           return newState;
       }

       return -1; // 获取资源失败
   }

   @Override
   protected boolean tryReleaseShared(int arg) {
    
       // 释放共享资源,直接增加同步状态
       for (;;) {
    
           int currentState = getState();
           int newState = currentState + arg;
           if (compareAndSetState(currentState, newState)) {
    
               return true;
           }
       }
   }
}
  • 构造方法 SharedLock(int resources):创建一个共享锁,并指定共享资源的数量。
  • acquireShared() 方法:获取共享资源。如果当前没有可用的资源,则线程会进入等待状态。
  • releaseShared() 方法:释放共享资源。
  • tryAcquireShared(int arg) 方法:尝试获取共享资源。arg 表示请求的资源数量。如果当前可用的资源数量不足以满足请求,则返回负值表示获取失败,否则更新同步状态并返回新的资源数量。
  • tryReleaseShared(int arg) 方法:尝试释放共享资源。arg 表示释放的资源数量。直接增加同步状态,表示释放资源。

下面使用一下我们自定义的共享锁

我们创建了一个包含 5 个共享资源的 SharedLock 对象,并创建了 10 个线程来获取和释放共享资源。每个线程会尝试获取一个共享资源,然后执行一段休眠时间后再释放该资源。

public class Main {
    
   public static void main(String[] args) {
    
       int totalResources = 5;
       SharedLock sharedLock = new SharedLock(totalResources);

       Runnable runnable = () -> {
    
           sharedLock.acquireShared();
           System.out.println(Thread.currentThread().getName() + " acquired the shared resource.");
           try {
    
               Thread.sleep(1000);
           } catch (InterruptedException e) {
    
               e.printStackTrace();
           }
           sharedLock.releaseShared();
           System.out.println(Thread.currentThread().getName() + " released the shared resource.");
       };

       for (int i = 0; i < 10; i++) {
    
           Thread thread = new Thread(runnable);
           thread.start();
       }
   }
}

注意: 共享锁允许多个线程同时获取资源,只要可用资源数量足够。当可用资源数量不足时,线程将进入等待状态,直到有足够的资源为止

总结

AQS的底层原理:

  1. 基于CAS(Compare and Swap): AQS主要利用了CAS操作(即compareAndSet()方法)来实现对共享变量state的原子更新。CAS是一种乐观锁机制,通过比较当前值与期望值是否相等,若相等则执行更新操作,否则重新尝试。
  2. 等待队列: 等待队列是AQS实现线程同步的关键数据结构,它采用双向链表来存储等待线程节点。当线程需要获取锁时,如果锁被其他线程持有,则该线程会进入等待队列中,并挂起。当锁释放时,AQS会从等待队列中唤醒某个线程,使其重新尝试获取锁。
  3. 独占模式和共享模式: AQS支持两种同步模式:独占模式和共享模式。独占模式适用于只允许一个线程访问资源的场景,如ReentrantLock。共享模式适用于允许多个线程同时访问资源的场景,如Semaphore和CountDownLatch。
相关文章
|
4月前
|
安全 Java API
JavaEE初阶 CAS,JUC的一些简单理解,包含concurrent, ReentrantLock,Semaphore以及ConcurrentHashMap
JavaEE初阶 CAS,JUC的一些简单理解,包含concurrent, ReentrantLock,Semaphore以及ConcurrentHashMap
36 0
|
7月前
|
消息中间件 监控 Java
JUC第二十六讲:JUC工具类: CountDownLatch详解
JUC第二十六讲:JUC工具类: CountDownLatch详解
|
3月前
|
安全 Java API
JUC的常见类
JUC的常见类
23 0
|
10月前
|
存储 安全 算法
一天一个 JUC 工具类 -- 并发集合
使用JUC工具包中的并发集合,我们可以避免手动处理锁和同步的复杂性,从而降低出现线程安全问题的概率。这些并发集合通过内部采用高效的算法和数据结构来优化并发操作,从而提供更好的性能和扩展性。
|
7月前
JUC第二十八讲:JUC工具类: Semaphore详解
JUC第二十八讲:JUC工具类: Semaphore详解
|
7月前
|
API
JUC第二十七讲:JUC工具类: CyclicBarrier详解
JUC第二十七讲:JUC工具类: CyclicBarrier详解
|
9月前
|
安全
JUC--初识
摩尔定律
|
9月前
|
Java 编译器 调度
JUC--多线程
多线程包含
|
12月前
|
安全
JUC中的常见类
JUC中的常见类
|
12月前
|
Java API 索引
【JUC基础】08. 三大工具类
JUC包中包含了三个非常实用的工具类:CountDownLatch(倒计数器),CyclicBarrier(循环栅栏),Semaphore(信号量)。
106 0