AQS详解之独占锁模式-阿里云开发者社区

开发者社区> 安全> 正文

AQS详解之独占锁模式

简介: AbstractQueuedSynchronizer简称AQS,即队列同步器。它是JUC包下面的核心组件,它的主要使用方式是继承,子类通过继承AQS,并实现它的抽象方法来管理同步状态,它分为独占锁和共享锁。

AQS 介绍

AbstractQueuedSynchronizer简称AQS,即队列同步器。它是JUC包下面的核心组件,它的主要使用方式是继承,子类通过继承AQS,并实现它的抽象方法来管理同步状态,它分为独占锁和共享锁。很多同步组件都是基于它来实现的,比如我门常见的ReentrantLock,它是基于AQS的独占锁实现的,它表示每次只能有一个线程持有锁。在比如ReentrantReadWriteLock它是基于AQS的共享锁实现的,它允许多个线程同时获取锁,并发的访问资源。AQS是建立在CAS上的一种FIFO的双向队列,它通过维护一个int类型的state,这个state是用volatile来修饰,从而保证状态的安全行。
AQS对于状态的更改提供了3个方法:

  1. getState() :返回同步状态的当前值
  2. setState() : 设置当前同步状态
  3. compareAndSetState():使用CAS设置当前状态,该方法能够保证状态的原子性。它是通过Unsafe这个类中的native方法来保证的。

AQS原理

如果请求的共享资源空闲,那么就把当前请求的线程设置为工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源占用,那么需要一套线程阻塞等待以及唤醒的锁的分配机制。那么这套机制AQS是用CLH队列锁实现的,获取不到锁的线程将加入到队列中。AQS内部维护的一个同步队列,获取失败的线程会加入到队列中进行自旋,移除队列条件是前驱节点是头节点并且成功获取到了同步状态,释放同步状态AQS会调用unparkSuccessor方法唤醒后继节点。

AQS数据结构

AQS队列内部维护的是一个FIFO的双向链表,如下图。这种结构的特点是每个数据结构都有2个指针,分别指向直接前驱节点和直接的后继节点。这种结构可以从任意的一个节点开始很方便的访问前驱和后继节点。每个Node由线程封装,当竞争失败后会加入到AQS队列中去。
AQS_

下面具体看一下Node组成:

static final class Node {
    /** 表示节点正处于共享模式下等待标记 */
    static final Node SHARED = new Node();
    /** 表示节点处于独占锁模式的等待标记 */
    static final Node EXCLUSIVE = null;
    /** waitStatus值,表示线程取消 */
    static final int CANCELLED =  1;
    /** waitStatus值,表示线程需要挂起 */
    static final int SIGNAL    = -1;
    /** waitStatus值,表示线程处于等待条件*/
    static final int CONDITION = -2;
    /**waitStatus值,表示下一个共享模式应该无条件传播*/
    static final int PROPAGATE = -3;
    /**状态字段*/
    volatile int waitStatus;
    /**前驱节点 */
    volatile Node prev;
    /**后继节点 */
    volatile Node next;
    /**当前线程*/
    volatile Thread thread;
    /**将此节点入列的线程,用来来接下一个节点*/
    Node nextWaiter;
    /**如果节点在共享模式下等待,则返回true*/
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    /**返回上一个节点,如果为null则抛出异常,前驱节点不是null使用 */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // 用于建立初始化head节点
    }
    Node(Thread thread, Node mode) {     // 由addWaiter使用
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // 由Condition使用
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

AQS添加节点

AQS将节点加入到同步队列的过程图,如下:
AQS_
加入队列的过程必须是线程安全的,所以AQS提供了一个基于CAS设置尾节点的方法compareAndSetTail,这个也是unsafe类中的native方法。它需要传入当前线程的认为的尾节点和当前节点,当设置成功后,当前节点和尾部节点建立关联,当前节点正式加入到队列。

AQS重要方法

AQS使用了模版方法模式,自定义同步器需要重写下面的几个AQS提供的模版方法:

isHeldExclusively()//该线程是否处于独占资源。只有用到condition才需要实现它.
tryAcquire(int)//独占方式获取资源,成功返回true,失败返回false
tryRelease(int)//独占方式释放资源,成功返回true,失败返回false
tryAcquireShared(int)//共享方式获取资源。负数表示失败,0表示成功但是没有剩余可用资源;正数表示成功且有剩余资源
tryReleaseShared(int)//共享方式释放资源.成功返回true,失败返回false.

AQS独占锁模式

独占锁的获取是通过AQS提供的acquire()。我门看一下这个方法的源代码:

public final void acquire(int arg) {
   if (!tryAcquire(arg) &&
       acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
}

发现acquire()获取同步状态成功与否做了2件事情。1成功,方法结束返回,2失败,会将当前线程加入到同步队列,它是通过调用addWaiter()和acquireQueued()方法实现的,我门继续看一下这2个方法的源代码.

private Node addWaiter(Node mode) {
   Node node = new Node(Thread.currentThread(), mode);
   Node pred = tail;
   if (pred != null) {
       node.prev = pred;
       if (compareAndSetTail(pred, node)) {
           pred.next = node;
           return node;
      }
  }
   enq(node);
   return node;
}

通过方法会发现它会先把当前线程封装为Node类型,然后判断尾节点是否为空,如果不为空进行CAS操作入队列,如果为空,那么会调用enq()这个方法,此方法做了通过不断的for循环自旋CAS尾插入节点。

现在我门已经明白独占锁获取失败入队列的过程了,那么对于同步队列的节点会做什么事情来保证自己有机会获取独占锁呢?我门来看一下acquireQueued()这个方法的源代码

final boolean acquireQueued(final Node node, int arg) {
   boolean failed = true;
   try {
       boolean interrupted = false;
       for (;;) {
           final Node p = node.predecessor();//获取前驱节点
           if (p == head && tryAcquire(arg)) {//当前节点是头节点并且成功获取到同步状态,那么获取到锁
               setHead(node);                 
               p.next = null; // help GC
               failed = false;
               return interrupted;
          }
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())//获取失败调用的方法
               interrupted = true;
      }
  } finally {
       if (failed)
           cancelAcquire(node);
  }
}

从源代码我门可以看出来这是一个自旋过程(for(;;)),它首先获取当前节点的前驱节点,然后判断当前节点能否获取独占锁,如果前驱节点是头节点并且获取同步状态,那么就可以获取到独占锁。如果获取锁失败线程会进入等待状态等待获取独占锁。

shouldParkAfterFailedAcquire()这个方法主要的逻辑是调用compareAndSetWaitStatus(),使用CAS将节点状态由INITIAL设置为SIGNAL。如果失败会返回false,通过acquireQueued()的自旋转会继续设置,直到设置成功。设置成功后调用parkAndCheckInterrupt()方法,此方法会调用LockSupport.park(this)让该线程阻塞。到此独占锁获取过程已经分析完毕了。

AQS独占锁获取流程图

AQS_

独占锁释放

独占锁的释放是用relase()方法,我门来看一下源代码

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
  }
   return false;
}

这段代码的逻辑就很容易理解了,如果同步状态释放成功,则执行if语句内的代码,当head不为空并且状态不为0的时候会执行unparkSuccessor()方法,unparkSuccessor方法会执行LookSupport.unpark()方法.每一次释放锁就会唤醒队列中该节点的后继节点,可以进一步的说明获取锁是一个先进先出的过程。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
+ 订阅

云安全开发者的大本营

其他文章