我们看下Reentrantock的源码。
public void lock() { sync.lock(); } public void unlock() { sync.release(1); } 原来
lock,unlock等核心方法都是通过sync来实现的。而sync其实是它的一个内部类。
abstract static class Sync extends AbstractQueuedSynchronizer {...} 1
这个内部类继承了AbstractQueuedSynchronizer,也就是我们今天要重点介绍的队列同步器AQS。它其实就是我们锁机制的基础,它封装了包括锁的获取、释放以及等待队列。
线程的调度其关键就在于等待队列,其数据结构就是双向链表,可以参考下图。
我们先来了解以下每个Node有什么内容,点开AQS的源码,它定义了内部类Node。
static final class Node { // 每个节点分为独占模式和共享模式、分别适用于独占锁和共享锁 static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 定义等待状态 // CANCELLED:唯一一个数值大与0的状态,说明此节点已经被取消 static final int CANCELLED = 1; // 此节点后面的节点被挂起,进入等待状态 static final int SIGNAL = -1; // 在条件队列中的状态 static final int CONDITION = -2; // 传播,一般用于共享锁 static final int PROPAGATE = -3; volatile int waitStatus; //等待状态值 volatile Node prev; //双向链表基本操作 volatile Node next; volatile Thread thread; //每一个线程可以封装到一个节点进入等待队列 Node nextWaiter; //在等待队列中表示模式,在条件队列中表示下一个节点 // 判断是否为共享节点 final boolean isShared() { return nextWaiter == SHARED; } // 返回前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // 初始化建立节点或共享标记(Used to establish initial head or SHARED marker) } Node(Thread thread, Node mode) { // 等待队列使用 this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // 条件对立使用 this.waitStatus = waitStatus; this.thread = thread; } }
再跳出Node看AQS,它定义了三个属性,head,tail默认为null,state默认为0,并且AQS的构造器并没有给它们赋值。
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
实际上,双向链表初始化是在实际使用时完成的,后面将演示使用。看看其中一个置状态的操作。
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
原来是通过unsafe的compareAndSwapInt()实现的。这个是CAS算法。不妨看看unsafe,它也是内部的属性。
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; // 找到各个属性锁在的内存地址(相对于unsafe类的偏移地址) static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * CAS 操作头节点 */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * CAS 操作尾节点 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * CAS 操作WaitStatus属性 */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * CAS 操作next属性 */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }
其实,Unsafe里面调用的都是native方法,读者可以自己点进去看看。它会直接找到属性的内存地址,操作内存中的数据,效率比较高。在AQS中静态块计算了各个属性的相对于类的偏移地址,并且在调用Unsafe中的方法时会将偏移地址传过去哦。
并且我们回过头看看,这里unsafe操作的属性在被定义时都是定义为volatile修饰,这是因为他们在被修改时都是使用的CAS算法,我们要使用vilotile修饰保证其可见性。
private volatile int state; // 当前锁的状态
现在我们已经大致了解了AQS的底层机制,接着来看看它到底时如何被使用的。先看看它可以被重写的五个方法吧。
// 独占式获取同步状态,查看同步状态是否与参数一致,如果没有问题则通过CAS设置同步状态并返回true protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 独占式释放同步状态 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 共享式获取同步状态,返回值大于0表示成功,否则失败 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享式释放同步状态 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 是否在独占模式下被当前线程占有(是否当前线程持有锁) protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
现在我们以ReentantLock的公平锁为例,看看它怎么被重写的。
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } ... }
先看看lock方法。调AQS的acquire()方法。里面会调用AQS中定义的tryAquire方法。而在ReentrantLock中tryAuire方法公平锁与非公平锁的实现不同,其具体内容我们暂时略过。这里使用短路&&运算,如果拿到锁了,就不会走后面的逻辑。否则会调用acquireQueued,其内部调用了addWaiter。这就是说如果其它线程持有锁,就会把当前节点加入等待队列中。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式,EXCLUSIVE selfInterrupt(); } 跟
到addWaiter中看看。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 先尝试用CAS直接入队,如果CAS入队成功,则return Node pred = tail; if (pred != null) { //初始状态tail尾节点未赋值会指null,如果不为空说明有其它节点插入了 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // CAS失败(其它线程也在获取锁或者tail节点为空无法cas) enq(node); return node; }
上面的注释写的很清楚,我们接着看看enq怎么实现的,它其实是AQS的一个自旋机制。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 说明头尾节点没有初始化 if (compareAndSetHead(new Node())) //新建空节点置为头节点 tail = head; //头尾节点指向同一个节点 } else { node.prev = t; //队列插入节点操作,把当前节点的prev指向尾节点 if (compareAndSetTail(t, node)) { //设置队列的尾节点为刚插入的当前节点 t.next = node; return t; } } } }
addWaiter终于看完了,再退回去看下,它的结果会作为参数传给acquireQueued,我们接着来看下acquireQueued。它在得到返回的节点时也会进入自旋状态(入等待队列成功,准备排队获取锁了)。
其过程可以结合下图理解