Java并发同步器AQS

简介: AQS是AbstractQueuedSynchronizer的简写,中文名应该叫抽象队列同步器(我给的名字,哈哈),出生于Java 1.5。一、什么是同步器多线程并发的执行,之间通过某种 共享 状态来同步,只有当状态满足 xxxx 条件,才能触发线程执行 xxxx 。

AQS是AbstractQueuedSynchronizer的简写,中文名应该叫抽象队列同步器(我给的名字,哈哈),出生于Java 1.5

一、什么是同步器

多线程并发的执行,之间通过某种 共享 状态来同步,只有当状态满足 xxxx 条件,才能触发线程执行 xxxx 。

这个共同的语义可以称之为同步器。可以认为以上所有的锁机制都可以基于同步器定制来实现的。

而juc(java.util.concurrent)里的思想是 将这些场景抽象出来的语义通过统一的同步框架来支持。

juc 里所有的这些锁机制都是基于 AQS ( AbstractQueuedSynchronizer )框架上构建的。下面简单介绍下 AQS( AbstractQueuedSynchronizer )。 可以参考Doug Lea的论文The java.util.concurrent Synchronizer Framework
我们来看下java.util.concurrent.locks大致结构

img_3c95bf2c071b255bfc61b6033720931e.jpe

上图中,LOCK的实现类其实都是构建在AbstractQueuedSynchronizer上,为何图中没有用UML线表示呢,这是每个Lock实现类都持有自己内部类Sync的实例,而这个Sync就是继承AbstractQueuedSynchronizer(AQS)。为何要实现不同的Sync呢?这和每种Lock用途相关。另外还有AQS的State机制。下文会举例说明不同同步器内的Sync与state实现。

二、AQS框架如何构建同步器

1、同步器的基本功能

一个同步器至少需要包含两个功能:

  1. 获取同步状态
    如果允许,则获取锁,如果不允许就阻塞线程,直到同步状态允许获取。

  2. 释放同步状态
    修改同步状态,并且唤醒等待线程。

aqs 同步机制同时考虑了如下需求:

  1. 独占锁和共享锁两种机制。

  2. 线程阻塞后,如果需要取消,需要支持中断。

  3. 线程阻塞后,如果有超时要求,应该支持超时后中断的机制。

2、同步状态的获取与释放

AQS实现了一个同步器的基本结构,下面以独占锁与共享锁分开讨论,来说明AQS怎样实现获取、释放同步状态。

2.1、独占模式

独占获取: tryAcquire 本身不会阻塞线程,如果返回 true 成功就继续,如果返回 false 那么就阻塞线程并加入阻塞队列。

public final void acquire(int arg) {  
  
        if (!tryAcquire(arg) &&  
  
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取失败,则加入等待队列  
  
            selfInterrupt();  
  
}

独占且可中断模式获取:支持中断取消

public final void acquireInterruptibly(int arg) throws InterruptedException {  
  
        if (Thread.interrupted())  
            throw new InterruptedException();  
        if (!tryAcquire(arg))  
  
            doAcquireInterruptibly(arg);  
  
    } 

独占且支持超时模式获取: 带有超时时间,如果经过超时时间则会退出。

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {  
  
     if (Thread.interrupted())  
  
         throw new InterruptedException();  
  
     return tryAcquire(arg) ||  
  
         doAcquireNanos(arg, nanosTimeout);  

独占模式释放:释放成功会唤醒后续节点

public final boolean release(int arg) {  
    if (tryRelease(arg)) {  
        Node h = head;  
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  
        return true;  
    }  
    return false;  
}

2.2、共享模式

共享模式获取

public final void acquireShared(int arg) {  
  
    if (tryAcquireShared(arg) < 0)  
  
        doAcquireShared(arg); 

可中断模式共享获取

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  
        if (Thread.interrupted())  
            throw new InterruptedException();  
        if (tryAcquireShared(arg) < 0)  
            doAcquireSharedInterruptibly(arg);  
    }  

共享模式带定时获取

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {  
     if (Thread.interrupted())  
         throw new InterruptedException();  
     return tryAcquireShared(arg) >= 0 ||  
         doAcquireSharedNanos(arg, nanosTimeout);  
}  

共享锁释放


public final boolean releaseShared(int arg) {  
        if (tryReleaseShared(arg)) {  
            doReleaseShared();  
            return true;  
        }  
        return false;  
    }  

注意以上框架只定义了一个同步器的基本结构框架,的基本方法里依赖的 tryAcquire 、 tryRelease 、tryAcquireShared 、 tryReleaseShared 四个方法在 AQS 里没有实现,这四个方法不会涉及线程阻塞,而是由各自不同的使用场景根据情况来定制:

protected boolean tryAcquire(int arg) {  
    throw new UnsupportedOperationException();  
}  
protected boolean tryRelease(int arg) {  
    throw new UnsupportedOperationException();  
}  
protected int tryAcquireShared(int arg) {  
    throw new UnsupportedOperationException();  
  
}  
protected boolean tryReleaseShared(int arg) {  
    throw new UnsupportedOperationException();  
}

从以上源码可以看出AQS实现基本的功能:

AQS虽然实现了acquire,和release方法是可能阻塞的,但是里面调用的tryAcquire和tryRelease是由子类来定制的且是不阻塞的可。以认为同步状态的维护、获取、释放动作是由子类实现的功能,而动作成功与否的后续行为时有AQS框架来实现。

3、状态获取、释放成功或失败的后续行为:线程的阻塞、唤醒机制

有别于wait和notiry。这里利用 jdk1.5 开始提供的 LockSupport.park() 和 LockSupport.unpark() 的本地方法实现,实现线程的阻塞和唤醒。

得到锁的线程禁用(park)和唤醒(unpark),也是直接native实现(这几个native方法的实现代码在hotspot\src\share\vm\prims\unsafe.cpp文件中,但是关键代码park的最终实现是和操作系统相关的,比如windows下实现是在os_windows.cpp中,有兴趣的同学可以下载jdk源码查看)。唤醒一个被park()线程主要手段包括以下几种

  1. 其他线程调用以被park()线程为参数的unpark(Thread thread).
  2. 其他线程中断被park()线程,如waiters.peek().interrupt();waiters为存储线程对象的队列.
  3. 不知原因的返回。

park()方法返回并不会报告到底是上诉哪种返回,所以返回好最好检查下线程状态,如

LockSupport.park();  //禁用当前线程
if(Thread.interrupted){
//doSomething
}

AbstractQueuedSynchronizer(AQS)对于这点实现得相当巧妙,如下所示

private void doAcquireSharedInterruptibly(int arg)throwsInterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
         for (;;) {
             final Node p = node.predecessor();
             if (p == head) {
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     return;
                 }
            }
             //parkAndCheckInterrupt()会返回park住的线程在被unpark后的线程状态,如果线程中断,跳出循环。
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 break;
      }
     } catch (RuntimeException ex) {
          cancelAcquire(node);
          throw ex;
 }
 
     // 只有线程被interrupt后才会走到这里
     cancelAcquire(node);
     throw new InterruptedException();
}
 
//在park()住的线程被unpark()后,第一时间返回当前线程是否被打断
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

4、线程阻塞队列的维护

阻塞线程节点队列 CHL Node queue

根据论文里描述, AQS 里将阻塞线程封装到一个内部类 Node 里。并维护一个 CHL Node FIFO 队列。 CHL队列是一个非阻塞的 FIFO 队列,也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。实现无锁且快速的插入。关于非阻塞算法可以参考 Java 理论与实践: 非阻塞算法简介 。CHL队列对应代码如下:

 /** 
 * CHL头节点 
 */   
rivate transient volatile Node head;  
/** 
 * CHL尾节点 
 */  
private transient volatile Node tail; 

Node节点是对Thread的一个封装,结构大概如下:

tatic final class Node {  
    /** 代表线程已经被取消*/  
    static final int CANCELLED =  1;  
    /** 代表后续节点需要唤醒 */  
    static final int SIGNAL    = -1;  
    /** 代表线程在等待某一条件/ 
    static final int CONDITION = -2; 
    /** 标记是共享模式*/  
    static final Node SHARED = new Node();  
    /** 标记是独占模式*/  
    static final Node EXCLUSIVE = null;  
  
    /** 
     * 状态位 ,分别可以使CANCELLED、SINGNAL、CONDITION、0 
     */  
    volatile int waitStatus;  
  
    /** 
     * 前置节点 
     */  
    volatile Node prev;  
  
    /** 
     * 后续节点 
     */  
    volatile Node next;  
  
    /** 
     * 节点代表的线程 
     */  
    volatile Thread thread;  
  
    /** 
     *连接到等待condition的下一个节点 
     */  
    Node nextWaiter;  
  
}  

5、小结

从源码可以看出AQS实现基本的功能:

1.同步器基本范式、结构

2.线程的阻塞、唤醒机制

3.线程阻塞队列的维护

AQS虽然实现了acquire,和release方法,但是里面调用的tryAcquire和tryRelease是由子类来定制的。可以认为同步状态的维护、获取、释放动作是由子类实现的功能,而动作成功与否的后续行为时有AQS框架来实现

还有以下一些私有方法,用于辅助完成以上的功能:

final boolean acquireQueued(final Node node, int arg) :申请队列

private Node enq(final Node node) : 入队

private Node addWaiter(Node mode) :以mode创建创建节点,并加入到队列

private void unparkSuccessor(Node node) : 唤醒节点的后续节点,如果存在的话。

private void doReleaseShared() :释放共享锁

private void setHeadAndPropagate(Node node, int propagate):设置头,并且如果是共享模式且propagate大于0,则唤醒后续节点。

private void cancelAcquire(Node node) :取消正在获取的节点

private static void selfInterrupt() :自我中断

private final boolean parkAndCheckInterrupt() : park 并判断线程是否中断

三、AQS在各同步器内的Sync与State实现

1、什么是state机制:

提供 volatile 变量 state; 用于同步线程之间的共享状态。通过 CAS 和 volatile 保证其原子性和可见性。对应源码里的定义:

/** 
 * 同步状态 
 */  
private volatile int state;  
  
/** 
 *cas 
 */  
protected final boolean compareAndSetState(int expect, int update) {  
    // See below for intrinsics setup to support this  
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  
} 

2、不同实现类的Sync与State:

基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不一样而已。

2.1、ReentrantLock

需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于1,也就是获得了重进入的效果,而其他线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。以下为ReetranLock的FairSync的tryAcquire实现代码解析。

//公平获取锁
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果当前重进入数为0,说明有机会取得锁
    if (c == 0) {
        //如果是第一个等待者,并且设置重进入数成功,那么当前线程获得锁
        if (isFirst(current) &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
     }
 }
    //如果当前线程本身就持有锁,那么叠加重进入数,并且继续获得锁
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
 }
     //以上条件都不满足,那么线程进入等待队列。
     return false;
}

2.2、Semaphore

则是要记录当前还有多少次许可可以使用,到0,就需要等待,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。以下为Semaphore的FairSync实现

protected int tryAcquireShared(int acquires) {
    Thread current = Thread.currentThread();
    for (;;) {
         Thread first = getFirstQueuedThread();
         //如果当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程需要等待
         if (first != null && first != current)
              return -1;
         //如果当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,并且减去当前线程需要的许可证得到剩下的值
         int available = getState();
         int remaining = available - acquires;
         //如果remining<0,那么反馈给AQS当前线程需要等待,如果remaining>0,并且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知AQS当前线程拿到许可,可以继续执行。
         if (remaining < 0 ||compareAndSetState(available, remaining))
             return remaining;
 }
}

2.3、CountDownLatch

闭锁则要保持其状态,在这个状态到达终止态之前,所有线程都会被park住,闭锁可以设定初始值,这个值的含义就是这个闭锁需要被countDown()几次,因为每次CountDown是sync.releaseShared(1),而一开始初始值为10的话,那么这个闭锁需要被countDown()十次,才能够将这个初始值减到0,从而释放原子状态,让等待的所有线程通过。

//await时候执行,只查看当前需要countDown数量减为0了,如果为0,说明可以继续执行,否则需要park住,等待countDown次数足够,并且unpark所有等待线程
public int tryAcquireShared(int acquires) {
     return getState() == 0? 1 : -1;
}
 
//countDown 时候执行,如果当前countDown数量为0,说明没有线程await,直接返回false而不需要唤醒park住线程,如果不为0,得到剩下需要 countDown的数量并且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS判定是否释放所有await线程。
public boolean tryReleaseShared(int releases) {
    for (;;) {
         int c = getState();
         if (c == 0)
             return false;
         int nextc = c-1;
         if (compareAndSetState(c, nextc))
             return nextc == 0;
 }
}

2.4、FutureTask

需要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续还是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是检查状态,如果是RUNNING状态那么让当前线程park。而跑任务的线程会在任务结束时调用FutureTask 实例的set方法(与等待线程持相同的实例),设定执行结果,并且通过unpark唤醒正在等待的线程,返回结果。

//get时待用,只检查当前任务是否完成或者被Cancel,如果未完成并且没有被cancel,那么告诉AQS当前线程需要进入等待队列并且park住
protected int tryAcquireShared(int ignore) {
     return innerIsDone()? 1 : -1;
}
 
//判定任务是否完成或者被Cancel
boolean innerIsDone() {
    return ranOrCancelled(getState()) &&    runner == null;
}
 
//get时调用,对于CANCEL与其他异常进行抛错
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
    if (!tryAcquireSharedNanos(0,nanosTimeout))
        throw new TimeoutException();
    if (getState() == CANCELLED)
        throw new CancellationException();
    if (exception != null)
        throw new ExecutionException(exception);
    return result;
}
 
//任务的执行线程执行完毕调用(set(V v))
void innerSet(V v) {
     for (;;) {
        int s = getState();
        //如果线程任务已经执行完毕,那么直接返回(多线程执行任务?)
        if (s == RAN)
            return;
        //如果被CANCEL了,那么释放等待线程,并且会抛错
        if (s == CANCELLED) {
            releaseShared(0);
            return;
     }
        //如果成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工作(一般由FutrueTask的子类实现)
        if (compareAndSetState(s, RAN)) {
            result = v;
            releaseShared(0);
            done();
            return;
     }
 }
}

以上4个AQS的使用是比较典型,然而有个问题就是这些状态存在哪里呢?并且是可以计数的。从以上4个example,我们可以很快得到答案,AQS提供给了子类一个int state属性。并且暴露给子类getState()和setState()两个方法(protected)。这样就为上述状态解决了存储问题,RetrantLock可以将这个state用于存储当前线程的重进入次数,Semaphore可以用这个state存储许可数,CountDownLatch则可以存储需要被countDown的次数,而Future则可以存储当前任务的执行状态(RUNING,RAN,CANCELL)。其他的Synchronizer存储他们的一些状态。

AQS留给实现者的方法主要有5个方法,其中tryAcquire,tryRelease和isHeldExclusively三个方法为需要独占形式获取的synchronizer实现的,比如线程独占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared为需要共享形式获取的synchronizer实现。

ReentrantLock内部Sync类实现的是tryAcquire,tryRelease, isHeldExclusively三个方法(因为获取锁的公平性问题,tryAcquire由继承该Sync类的内部类FairSync和NonfairSync实现)Semaphore内部类Sync则实现了tryAcquireShared和tryReleasedShared(与CountDownLatch相似,因为公平性问题,tryAcquireShared由其内部类FairSync和NonfairSync实现)。CountDownLatch内部类Sync实现了tryAcquireShared和tryReleasedShared。FutureTask内部类Sync也实现了tryAcquireShared和tryReleasedShared。

目录
相关文章
|
4月前
|
Java 开发者 C++
Java多线程同步大揭秘:synchronized与Lock的终极对决!
Java多线程同步大揭秘:synchronized与Lock的终极对决!
91 5
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
54 2
|
1月前
|
Java 调度
Java 线程同步的四种方式,最全详解,建议收藏!
本文详细解析了Java线程同步的四种方式:synchronized关键字、ReentrantLock、原子变量和ThreadLocal,通过实例代码和对比分析,帮助你深入理解线程同步机制。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Java 线程同步的四种方式,最全详解,建议收藏!
|
2月前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
52 1
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
36 1
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
3月前
|
存储 Java
JAVA并发编程AQS原理剖析
很多小朋友面试时候,面试官考察并发编程部分,都会被问:说一下AQS原理。面对并发编程基础和面试经验,专栏采用通俗简洁无废话无八股文方式,已陆续梳理分享了《一文看懂全部锁机制》、《JUC包之CAS原理》、《volatile核心原理》、《synchronized全能王的原理》,希望可以帮到大家巩固相关核心技术原理。今天我们聊聊AQS....