Juc并发编程06——深入剖析队列同步器AQS源码

简介: 我们看下Reentrantock的源码

我们看下Reentrantock的源码。

public void lock() {
        sync.lock();
    }
   public void unlock() {
        sync.release(1);
    }
原来

lock,unlock等核心方法都是通过sync来实现的。而sync其实是它的一个内部类。

abstract static class Sync extends AbstractQueuedSynchronizer {...}
1

这个内部类继承了AbstractQueuedSynchronizer,也就是我们今天要重点介绍的队列同步器AQS。它其实就是我们锁机制的基础,它封装了包括锁的获取、释放以及等待队列。


线程的调度其关键就在于等待队列,其数据结构就是双向链表,可以参考下图。

我们先来了解以下每个Node有什么内容,点开AQS的源码,它定义了内部类Node。


static final class Node {
        // 每个节点分为独占模式和共享模式、分别适用于独占锁和共享锁    
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        // 定义等待状态
        // CANCELLED:唯一一个数值大与0的状态,说明此节点已经被取消
        static final int CANCELLED =  1;
        // 此节点后面的节点被挂起,进入等待状态
        static final int SIGNAL    = -1;
        // 在条件队列中的状态
        static final int CONDITION = -2;
        // 传播,一般用于共享锁
        static final int PROPAGATE = -3;
        volatile int waitStatus; //等待状态值
        volatile Node prev; //双向链表基本操作
        volatile Node next;
        volatile Thread thread; //每一个线程可以封装到一个节点进入等待队列
        Node nextWaiter; //在等待队列中表示模式,在条件队列中表示下一个节点
       // 判断是否为共享节点
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
       // 返回前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {   // 初始化建立节点或共享标记(Used to establish initial head or SHARED marker)
        }
        Node(Thread thread, Node mode) {     // 等待队列使用
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // 条件对立使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

再跳出Node看AQS,它定义了三个属性,head,tail默认为null,state默认为0,并且AQS的构造器并没有给它们赋值。


protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }


实际上,双向链表初始化是在实际使用时完成的,后面将演示使用。看看其中一个置状态的操作。

protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

原来是通过unsafe的compareAndSwapInt()实现的。这个是CAS算法。不妨看看unsafe,它也是内部的属性。

private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
  // 找到各个属性锁在的内存地址(相对于unsafe类的偏移地址)
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    /**
     * CAS 操作头节点
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    /**
     * CAS 操作尾节点
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
    /**
     * CAS 操作WaitStatus属性
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
    /**
     * CAS 操作next属性 
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

其实,Unsafe里面调用的都是native方法,读者可以自己点进去看看。它会直接找到属性的内存地址,操作内存中的数据,效率比较高。在AQS中静态块计算了各个属性的相对于类的偏移地址,并且在调用Unsafe中的方法时会将偏移地址传过去哦。


并且我们回过头看看,这里unsafe操作的属性在被定义时都是定义为volatile修饰,这是因为他们在被修改时都是使用的CAS算法,我们要使用vilotile修饰保证其可见性。

private volatile int state; // 当前锁的状态

现在我们已经大致了解了AQS的底层机制,接着来看看它到底时如何被使用的。先看看它可以被重写的五个方法吧。

// 独占式获取同步状态,查看同步状态是否与参数一致,如果没有问题则通过CAS设置同步状态并返回true
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
   // 独占式释放同步状态
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
   // 共享式获取同步状态,返回值大于0表示成功,否则失败
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
   // 共享式释放同步状态
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
   // 是否在独占模式下被当前线程占有(是否当前线程持有锁)
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

现在我们以ReentantLock的公平锁为例,看看它怎么被重写的。

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        final void lock() {
            acquire(1);
        }
        ...
}

先看看lock方法。调AQS的acquire()方法。里面会调用AQS中定义的tryAquire方法。而在ReentrantLock中tryAuire方法公平锁与非公平锁的实现不同,其具体内容我们暂时略过。这里使用短路&&运算,如果拿到锁了,就不会走后面的逻辑。否则会调用acquireQueued,其内部调用了addWaiter。这就是说如果其它线程持有锁,就会把当前节点加入等待队列中。


public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式,EXCLUSIVE
            selfInterrupt();
    }

到addWaiter中看看。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 先尝试用CAS直接入队,如果CAS入队成功,则return
        Node pred = tail;
        if (pred != null) { //初始状态tail尾节点未赋值会指null,如果不为空说明有其它节点插入了
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // CAS失败(其它线程也在获取锁或者tail节点为空无法cas)
        enq(node);
        return node;
    }

上面的注释写的很清楚,我们接着看看enq怎么实现的,它其实是AQS的一个自旋机制。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 说明头尾节点没有初始化
                if (compareAndSetHead(new Node())) //新建空节点置为头节点
                    tail = head; //头尾节点指向同一个节点
            } else {
                node.prev = t; //队列插入节点操作,把当前节点的prev指向尾节点
                if (compareAndSetTail(t, node)) { //设置队列的尾节点为刚插入的当前节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter终于看完了,再退回去看下,它的结果会作为参数传给acquireQueued,我们接着来看下acquireQueued。它在得到返回的节点时也会进入自旋状态(入等待队列成功,准备排队获取锁了)。


其过程可以结合下图理解

相关文章
|
8天前
|
存储 Java
AQS(AbstractQueuedSynchronizer,队列同步器)源码解读
AQS(AbstractQueuedSynchronizer,队列同步器)源码解读
|
4月前
|
Java
JUC并发编程之等待唤醒机制
在JUC(Java Util Concurrent)并发编程中,线程等待唤醒机制是实现线程之间协作和同步的重要手段。这种机制允许一个线程挂起等待某个条件满足后被唤醒,以及另一个线程在满足某个条件后唤醒等待的线程。在Java中,有多种实现线程等待唤醒机制的方式,包括使用Object的wait()和notify()方法、Condition接口以及LockSupport类。
|
5月前
|
存储 设计模式 算法
队列同步器AQS-AbstractQueuedSynchronizer 原理分析
队列同步器AQS-AbstractQueuedSynchronizer 原理分析
45 0
|
12月前
|
Java 调度 容器
并发编程-15并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍
并发编程-15并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍
88 0
AQS(abstractQueuedSynchronizer)锁实现原理详解
AQS(abstractQueuedSynchronizer)抽象队列同步器。其本身是一个抽象类,提供lock锁的实现。聚合大量的锁机制实现的共用方法。
117 0
|
存储 前端开发 安全
【JUC并发编程】 详解锁与队列
【JUC并发编程】 详解锁与队列
124 0
【JUC并发编程】 详解锁与队列
|
Java
Java并发编程 - AQS 简介(AbstractQueuedSynchronizer)
Java并发编程 - AQS 简介(AbstractQueuedSynchronizer)
124 0
Java并发编程 - AQS 简介(AbstractQueuedSynchronizer)
|
设计模式 Java
JUC并发编程——AQS源码解读
JUC并发编程——AQS源码解读
151 0
JUC并发编程——AQS源码解读
|
安全 Java
Java并发之AQS源码分析(二)
我在 Java并发之AQS源码分析(一)这篇文章中,从源码的角度深度剖析了 AQS 独占锁模式下的获取锁与释放锁的逻辑,如果你把这部分搞明白了,再看共享锁的实现原理,思路就会清晰很多。下面我们继续从源码中窥探共享锁的实现原理。
126 0
Java并发之AQS源码分析(二)
|
存储 Java
Java并发之AQS源码分析(一)
AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时利用 FIFO 队列实现线程间的锁竞争,将基础的同步相关抽象细节放在 AQS,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。
105 0
Java并发之AQS源码分析(一)