6.AQS架构
6.1它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)
6.2AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)
6.3不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种模板方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int): 独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
6.4以ReentrantLock的源码为例来深入理解AQS
ReentrantLock是基于AQS的独占锁来实现的
6.4.1.state初始化时为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1
6.4.2.此后,其他线程再tryAcquire()会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获得该锁
6.4.3 ReentrantLock 默认采用非公平锁,除非在构造方法中传入参数 true 。非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
ReentrantLock类结构
下面看下Sync类源代码:
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { //releasesy一般传1 int c = getState() - releases; //当前线程不是独占的不满足条件抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果state - releasesd等于0把空闲状态free置为true,同时把独占线程置为null if (c == 0) { free = true; setExclusiveOwnerThread(null); } //设置新的state setState(c); return free; } protected final boolean isHeldExclusively() { //判断Thread是否和当前线程相等从而判断是否为独占的 return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { //等于0表示没有任何线程占用这个资源,有的话返回占用的线程 return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
FairSync 类里的tryAcquire方法:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //队列里找不到等待的其他线程,就设置为当前线程独占,非公平锁(具体实现是Sync类的 //nonfairTryAcquire方法,见上面代码)下的tryAcquire没有!hasQueuedPredecessors //判断其他代码都一样 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //可重入锁主要体现在这段逻辑,占用的线程就是当前线程把state加1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
AbstractQueuedSynchronizer的 acquire方法
public final void acquire(int arg) { //1.addWaiter(Node.EXCLUSIVE)把最后一个线程放在队列尾部 //2.acquireQueued循环判断新入的节点是否为前驱节点,是前驱节点时 //3.整个if的判断逻辑是没有获得锁成功的话加入等待队列的的再打断自己 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire方法调用流程图
AbstractQueuedSynchronizer的 acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //不断自旋 for (;;) { final Node p = node.predecessor(); //是前驱节点时,尝试获取许可tryAcquire if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
6.5以CountDownLatch的原理为例来深入理解AQS
CountDownLatch是基于AQS的共享锁来实现的,由于内部类继承了AQS,所以它内部也是FIFO队列,同时也一样是前驱节点唤醒后继节点,不能像CyclicBarrier那样使用完毕后还可以复用;
6.5.1任务分为N个任子线程去执行,state也初始化为N(注意N要要与线程个数一致)
6.5.2这N个线程也是并行执行的,每个子线程执行完后后countDown()一次,state会CAS减1
6.5.3等到所有子线程都执行完成之后即(state==0),会调用LockSupport.unpark(s.thread)
6.5.4然后主调用会从await()函数返回(之前是通过LockSupport.park(this);阻塞),继续后余动作
源码分析: 【JDK源码分析】并发包同步工具CountDownLatch - 还是搬砖踏实 - 博客园
7.ReentrantLock使用场景
场景1:如果发现该操作已经在执行中则不再执行(有状态执行)
ReentrantLock lock = new ReentrantLock(); if(lock.tryLock()){ try{ }finally{ lock.unlock(); } }
场景2:如果发现该操作已经在执行,等待一个一个执行(同步执行,类似synchronized)
场景3:如果发现该操作已经在执行,则尝试等待一段时间,等待超时则不执行(尝试等待执行)
import java.util.concurrent.*; import java.util.concurrent.locks.*; public class AttemptLocking { private ReentrantLock lock = new ReentrantLock(); public void untimed() { boolean captured = lock.tryLock(); try { System.out.println("tryLock(): " + captured); } finally { if (captured) lock.unlock(); } } public void timed() { boolean captured = false; try { captured = lock.tryLock(2, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } try { System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured); } finally { if (captured) lock.unlock(); } } public static void main(String[] args) throws InterruptedException { final AttemptLocking al = new AttemptLocking(); al.untimed(); // True -- 可以成功获得锁 al.timed(); // True --可以成功获得锁 //新创建一个线程获得锁并且不释放 new Thread() { { setDaemon(true); } public void run() { al.lock.lock(); System.out.println("acquired"); } }.start(); Thread.sleep(100);// 保证新线程能够先执行 al.untimed(); // False -- 马上中断放弃 al.timed(); // False -- 等两秒超时后中断放弃 } }
场景4:如果发现该操作已经在执行,等待执行。这时可中断正在进行的操作立刻释放锁继续下一操作(类似于wait())
ReentrantLock lock = new ReentrantLock(); try{ lock.lockInterruptibly(); }finally{ lock.unlock(); }
另外在ReentrantLock类中定义了很多方法,比如:
isFair() //判断锁是否是公平锁
isLocked() //判断锁是否被任何线程获取了
isHeldByCurrentThread() //判断锁是否被当前线程获取了
hasQueuedThreads() //判断是否有线程在等待该锁
8、线程中断
Thread.currentThread().interrupt()
线程中断只是一个状态而已,true表示已中断,false表示未中断
//获取线程中断状态,如果中断了返回true,否则返回false
Thread.currentThread().isInterrupted()
设置线程中断不影响线程的继续执行,但是线程设置中断后,线程内调用了wait、jion、sleep方法中的一种, 立马抛出一个 InterruptedException,且中断标志被清除,重新设置为false。
这个恢复过来就可以包含两个目的:
一、[可以使线程继续执行],那就是在catch语句中招待醒来后的逻辑,或由catch语句转回正常的逻辑。总之它是从wait,sleep,join的暂停状态活过来了。
二、[可以直接停止线程的运行],当然在catch中什么也不处理,或return,那么就完成了当前线程的使命,可以使在上面"暂停"的状态中立即真正的"停止"。
9、ABA问题
ABA问题是指在CAS操作中带来的潜在问题
CAS意思是 compare and swap 或者 compare and set , CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。
如果CAS操作是基于CPU内核的原子操作,那基本是不会出现ABA问题的,但是如果CAS本身操作不满足原子性,则会带来ABA问题,
比如两个线程
- 线程1 查询A的值为a,与旧值a比较,
- 线程2 查询A的值为a,与旧值a比较,相等,更新为b值
- 线程2 查询A的值为b,与旧值b比较,相等,更新为a值
- 线程1 相等,更新B的值为c
可以看到这样的情况下,线程1 可以正常 进行CAS操作,将值从a变为c 但是在这之间,实际A值已经发了a->b b->a的转换
仔细思考,这样可能带来的问题是,如果需要关注A值变化过程,是会漏掉一段时间窗口的监控
10、Lock锁和Condition条件
Lock的特性:
1).Lock不是Java语言内置的;
2).synchronized是在JVM层面上实现的,如果代码执行出现异常,JVM会自动释放锁,但是Lock不行,要保证锁一定会被释放,就必须将unLock放到finally{}中(手动释放);
3).在资源竞争不是很激烈的情况下,Synchronized的性能要优于ReetarntLock,但是在很激烈的情况下,synchronized的性能会下降几十倍(新版本的jdk新能相差不大);
4).ReentrantLock增加了锁:
a. void lock(); // 无条件的锁;
b. void lockInterruptibly throws InterruptedException;//可中断的锁;
解释: 使用ReentrantLock如果获取了锁立即返回,如果没有获取锁,当前线程处于休眠状态,直到获得锁或者当前线程可以被别的线程中断去做其他的事情;但是如果是synchronized的话,如果没有获取到锁,则会一直等待下去;
c. boolean tryLock();//如果获取了锁立即返回true,如果别的线程正持有,立即返回false,不会等待;
d. boolean tryLock(long timeout,TimeUnit unit);//如果获取了锁立即返回true,如果别的线程正持有锁,会等待参数给的时间,在等待的过程中,如果获取锁,则返回true,如果等待超时,返回false;
Condition的特性:
1.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的这些方法是和同步锁捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。
2.Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。
如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。