4 AQS
4.1 前言
如果要想真正的理解JUC
下的并发工具的实现原理,我们必须要来学习AQS
,因为它是JUC
下很多类的基石。
在讲解AQS之前,如果老板让你自己写一个SDK层面的锁,给其他同事去使用,你会如何写呢?
1、搞一个状态标记,用来表示持有或未持有锁,但得是volatile
类型的保证线程可见性。
2、编写一个lock
,unlock
函数用于抢锁和释放锁,就是对状态标记的修改操作
3、lock
函数要保证并发下只能有一个线程能抢到锁,其他线程要等待获取锁(阻塞式),可以采用CAS+自旋的方式实现
初步实现如下:
public class MyLock { // 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有 private volatile int status; private static final Unsafe unsafe = reflectGetUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (MyLock.class.getDeclaredField("status")); } catch (Exception ex) { throw new Error(ex); } } private static Unsafe reflectGetUnsafe() { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 阻塞式获取锁 * @return */ public boolean lock() { while (!compareAndSet(0,1)) { } return true; } // cas 设置 status public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } /** * 释放锁 */ public void unlock() { status = 0; } }
问题:获取不到锁自旋时,是空转,浪费CPU
1、使用yield
让出CPU执行权,等待调度
public boolean lock() { while (!compareAndSet(0,1)) { Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源 } return true; }
或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。
2、采用等待唤醒机制,但是这里由于没有使用synchronized
关键字,所以也无法使用wait/notify
,但是我们可以使用park/unpark
,获取不到锁的线程park
并且去队列排队,释放锁时从队列拿出一个线程unpark
private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>(); public boolean lock() { while (!compareAndSet(0,1)) { QUEUE.offer(Thread.currentThread()); LockSupport.park();//线程休眠 } return true; } public void unlock() { status = 0; LockSupport.unpark(QUEUE.poll()); }
4.2 AQS概述
AQS(AbstractQueuedSynchronizer):抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,提供了SDK层面的锁机制,JUC中的很多类譬如:ReentrantLock/Semaphore/CountDownLatch…等都是基于它。
通过查阅作者的对于该类的文档注释可以得到如下核心信息:
1、AQS用一个volatile int state;
属性表示锁状态,1表示锁被持有,0表示未被持有,具体的维护由子类去维护,但是提供了修改该属性的三个方法:getState()
,setState(int newState)
,compareAndSetState(int expect, int update)
,其中CAS方法是核心。
2、框架内部维护了一个FIFO的等待队列,是用双向链表实现的,我们称之为CLH队列,
3、框架也内部也实现了条件变量Condition
,用它来实现等待唤醒机制,并且支持多个条件变量
4、AQS支持两种资源共享的模式:独占模式(Exclusive)和共享模式(Share),所谓独占模式就是任意时刻只允许一个线程访问共享资源,譬如ReentrantLock;而共享模式指的就是允许多个线程同时访问共享资源,譬如Semaphore/CountDownLatch
5、使用者只需继承AbstractQueuedSynchronizer
并重写指定的方法,在方法内完成对共享资源state
的获取和释放,至于具体线程等待队列的维护,AQS已经在顶层实现好了,在那些final
的模板方法里。
* <p>To use this class as the basis of a synchronizer, redefine the * following methods, as applicable, by inspecting and/or modifying * the synchronization state using {@link #getState}, {@link * #setState} and/or {@link #compareAndSetState}: * * <ul> * <li> {@link #tryAcquire} * <li> {@link #tryRelease} * <li> {@link #tryAcquireShared} * <li> {@link #tryReleaseShared} * <li> {@link #isHeldExclusively} * </ul> * * Each of these methods by default throws {@link * UnsupportedOperationException}. Implementations of these methods * must be internally thread-safe, and should in general be short and * not block. Defining these methods is the <em>only</em> supported * means of using this class. All other methods are declared * {@code final} because they cannot be independently varied.
6、AQS底层使用了模板方法模式,给我们提供了许多模板方法,我们直接使用即可。
API | 说明 | |
final void acquire(int arg) | 独占模式获取锁,AQS顶层已实现,内部调用了tryAcquire | 模板方法 |
boolean tryAcquire(int arg) | 独占模式 尝试获取锁,AQS中未实现,由子类去实现,获取到锁返回true | |
final boolean release(int arg) | 释放独占锁,AQS顶层已实现,内部调用了tryRelease | 模板方法 |
boolean tryRelease(int arg) | 尝试释放独占锁,AQS中未实现,由子类去实现,成功释放返回true | |
final void acquireShared(int arg) | 共享模式获取锁,AQS顶层已实现,内部调用了tryAcquireShared | 模板方法 |
int tryAcquireShared(int arg) | 尝试获取共享锁,返回负数表示失败,0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源,AQS中未实现,由子类实现 |
final boolean releaseShared(int arg) | 释放共享锁,返回true代表释放成功,AQS中已实现,内部调用了tryReleaseShared | 模板方法 |
boolean tryReleaseShared(int arg) | 尝试释放锁,释放后允许唤醒后续等待结点返回true,否则返回false,AQS中未实现,需要由子类实现 |
boolean isHeldExclusively() | 共享资源是否被独占 |
4.3 基本使用
此时老板给你加了需求,要求你实现一个基于AQS的锁,那该怎么办呢?
在AbstractQueuedSynchronizer
的类注释中给出了使用它的基本方法,我们按照它的写法尝试即可
/** * 基于 aqs实现锁 */ public class MyLock implements Lock { //同步器 private Syn syn = new Syn(); @Override public void lock() { //调用模板方法 syn.acquire(1); } @Override public void unlock() { //调用模板方法 syn.release(0); } // 其他接口方法暂时先不实现 省略 // 实现一个独占同步器 class Syn extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0,arg)) { return true; } return false; } @Override protected boolean tryRelease(int arg) { setState(arg); return true; } } }
4.4 原理解析
自己实现的锁在使用过程中发现一个问题,就是有时候有的线程特别容易抢到锁,而有的线程老是抢不到锁,虽说线程们抢锁确实看命,但能不能加入一种设计,让各个线程机会均等些,起码不要出现某几个线程总是特倒霉抢不到锁的情况吧!
这其实就是涉及到锁是否是公平的,那么什么是公平锁什么是非公平锁呢?
这我们就不得不深入我们使用的模板方法中看一眼了
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //结合我自己写的尝试获取锁的方法 protected boolean tryAcquire(int arg) { if (compareAndSetState(0,arg)) { return true; } return false; }
这里大概描述如下:
1、线程一来首先调用tryAcquire
,在tryAcquire
中直接CAS获取锁,如果获取不成功通过addWaiter
加入等待队列,然后走acquireQueued
让队列中的某个等待线程去获取锁。
2、不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那直接获取锁没什么 问题,如果有线程等待就直接去获取锁不就相当于插队么?
那如何实现这种公平性呢?这就不得不探究一下AQS的内部的实现原理了,下面我们依次来看:
1、查看AbstractQueuedSynchronizer
的类定义,虽然它里面代码很多,但重要的属性就那么几个,
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private volatile int state; private transient volatile Node head; private transient volatile Node tail; static final class Node { //其他不重要的略 volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; } public class ConditionObject implements Condition, java.io.Serializable {...} }
结合前面讲的AQS的类文档注释不难猜到,内部类 Node
以及其类型的变量 head
和 tail
就表示 AQS 内部的一个等待队列,而剩下的 state
变量就用来表示锁的状态。
等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node
的属性我们知道,Node
存放了当前线程的指针 thread
,也即可以表示当前线程并对其进行某些操作,prev
和 next
说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。
2、AQS加锁最核心的代码就是如下,我们要来探究它的实现原理
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
它的原理及整个过程我们以图的形式说明如下:
3、原理搞懂了,那如何让自定义的锁是公平的呢?
其实导致不公平的原因就是线程每次调用acquire
时,都会先去tryAcquire
,而该方法目前的实现时直接去抢锁,也不看现在等待队列中有没有线程在排队,如果有线程在排队,那岂不是变成了插队,导致不公平。
所以现在的解决办法就是,在tryAcquire
时先看一下等待队列中是否有在排队的,如果有那就乖乖去排队,不插队,如果没有则可以直接去获取锁。
那如何知道线程AQS等待队列中是否有线程排队呢?其实AQS顶层已经实现好了,它提供了一个hasQueuedPredecessors
函数:如果在当前线程之前有一个排队的线程,则为True; 如果当前线程位于队列的头部(head.next
)或队列为空,则为false。
protected boolean tryAcquire(int arg) { //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁 if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) { return true; } return false; }
4、现在已经有公平锁了,但是成年人的世界不是做选择题,而是都想要,自己编写的锁既能支持公平锁,也支持非公平锁,让使用者可以自由选择,怎么办?
其实只要稍微改造一下即可,
public class MyLock implements Lock { //同步器 private Sync syn ; MyLock () { syn = new NoFairSync(); } MyLock (boolean fair) { syn = fair ? new FairSync():new NoFairSync(); } @Override public void lock() { //调用模板方法 syn.acquire(1); } @Override public void unlock() { //调用模板方法 syn.release(0); } // Lock接口其他方法暂时先不实现 略 // 实现一个独占同步器 class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryRelease(int arg) { setState(arg); return true; } } class FairSync extends Sync { @Override protected boolean tryAcquire(int arg) { //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁 if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) { return true; } return false; } } class NoFairSync extends Sync { @Override protected boolean tryAcquire(int arg) { //直接去获取锁 if (compareAndSetState(0,arg)) { return true; } return false; } } }
5、现在锁的公平性问题解决了,但是老板又出了新的需求,要求我们的锁支持可重入,因为它写了如下一段代码,发现一直获取不到锁
static Lock lock = new MyLock(); static void test3() { lock.lock(); try { System.out.println("test3 get lock,then do something "); test4(); } finally { lock.unlock(); } } static void test4() { lock.lock(); try { System.out.println("test4 get lock,then do something "); } finally { lock.unlock(); } }
那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁,也就是说让锁只对不同线程互斥。
查看AbstractQueuedSynchronizer
的定义我们发现,它还继承自另一个类:AbstractOwnableSynchronizer
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {...} public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) {...} protected final Thread getExclusiveOwnerThread(){...} }
看到这我们明白了,原来AQS
中有个变量是可以保存当前持有独占锁的线程的。那好办了,当我们获取锁时,如果发现锁被持有不要着急放弃,先看看持有锁的线程是否时当前线程,如果是还能继续获取锁。
另外关于可重入锁,还要注意一点,锁的获取和释放操作是成对出现的,就像下面这样
lock lock lock lock .... unlock unlock unlock unlock
所以对于重入锁不仅要能记录锁被持有,还要记录重入的次数,释放的时候也不是直接将锁真实的释放,而是先减少重入次数,能释放的时候在释放。
故此时状态变量state不在只有两个取值0,1,某线程获取到锁state=1,如果当前线程重入获取只需增加状态值state=2,依次同理,锁释放时释放一次状态值-1,当state=0时才真正释放,其他线程才能继续获取锁。
修改我们锁的代码如下:公平非公平在可重入上的逻辑是一样的
public class MyLock implements Lock { //同步器 private Sync syn ; MyLock () { syn = new NoFairSync(); } MyLock (boolean fair) { syn = fair ? new FairSync():new NoFairSync(); } @Override public void lock() { //调用模板方法 syn.acquire(1); } @Override public void unlock() { //调用模板方法 syn.release(1); } // Lock接口其他方法暂时先不实现 略 // 实现一个独占同步器 class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryRelease(int arg) { if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } boolean realRelease = false; int nextState = getState() - arg; if (nextState == 0) { realRelease = true; setExclusiveOwnerThread(null); } setState(nextState); return realRelease; } } class FairSync extends Sync { @Override protected boolean tryAcquire(int arg) { final Thread currentThread = Thread.currentThread(); int currentState = getState(); if (currentState == 0 ) { // 可以获取锁 //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁 if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) { setExclusiveOwnerThread(currentThread); return true; } }else if (currentThread == getExclusiveOwnerThread()) { //重入逻辑 增加 state值 int nextState = currentState + arg; if (nextState < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextState); return true; } return false; } } class NoFairSync extends Sync { @Override protected boolean tryAcquire(int arg) { final Thread currentThread = Thread.currentThread(); int currentState = getState(); if (currentState ==0 ) { // 可以获取锁 //直接去获取锁 if (compareAndSetState(0,arg)) { setExclusiveOwnerThread(currentThread); return true; } }else if (currentThread == getExclusiveOwnerThread()) { //重入逻辑 增加 state值 int nextState = currentState + arg; if (nextState < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextState); return true; } return false; } } }
好了至此我们已经掌握了AQS
的核心原理以及它的一个经典实现ReentrantLock
几乎全部的知识点,此时打开ReentrantLock
的源码你会发现一切都很清爽!!!
5 并发容器
juc中还包含很多其他的并发容器(了解)
1.ConcurrentHashMap
对应:HashMap
目标:代替Hashtable、synchronizedMap,使用最多,源码篇会详细讲解
原理:JDK7中采用Segment分段锁,JDK8中采用CAS+synchronized
2.CopyOnWriteArrayList
对应:ArrayList
目标:代替Vector、synchronizedList
原理:高并发往往是读多写少的特性,读操作不加锁,而对写操作加Lock独享锁,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性。
查看源码:volatile array,lock加锁,数组复制
3.CopyOnWriteArraySet
对应:HashSet
目标:代替synchronizedSet
原理:与CopyOnWriteArrayList实现原理类似。
4.ConcurrentSkipListMap
对应:TreeMap
目标:代替synchronizedSortedMap(TreeMap)
原理:基于Skip list(跳表)来代替平衡树,按照分层key上下链接指针来实现。
附加:跳表
5.ConcurrentSkipListSet
对应:TreeSet
目标:代替synchronizedSortedSet(TreeSet)
原理:内部基于ConcurrentSkipListMap实现,原理一致
6.ConcurrentLinkedQueue
对应:LinkedList
对应:无界线程安全队列
原理:通过队首队尾指针,以及Node类元素的next实现FIFO队列
7.BlockingQueue
对应:Queue
特点:拓展了Queue,增加了可阻塞的插入和获取等操作
原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒
实现类:
- LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
- ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
- PriorityBlockingQueue:按优先级排序的队列