前言
上篇文章10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)说到JUC并发包中的同步组件大多使用AQS来实现
本篇文章通过AQS自己来实现一个同步组件,并从源码级别聊聊JUC并发包中的常用同步组件
本篇文章需要的前置知识就是AQS,如果不了解AQS的同学可以看上一篇文章哈~
阅读本篇文章大概需要13分钟
自定义同步组件
为了更容易理解其他同步组件,我们先来使用AQS自己来实现一个常用的可重入锁
AQS模板方法流程是固定的,我们主要只需要来实现它的尝试获取同步状态和尝试释放同步状态方法即可
首先我们先规定要实现的可重入锁是独占式的
规定同步状态一开始为0,当有线程获取锁成功同步状态就为1,当这个线程重入时就累加同步状态
规定释放同步状态时每次扣减1个同步状态,只有当同步状态扣减到0时,才是真正的释放独占锁
我们使用一个内部类Sync 来继承AQS 并重写tryAcquire
尝试获取同步状态、tryRelease
尝试释放同步状态、isHeldExclusively
判断当前线程是否持有同步状态(等待、通知时会用到该方法)
static class Sync extends AbstractQueuedSynchronizer { /** * 判断当前线程是否持有同步状态 * * @return */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } }
在获取同步状态中
- 先判断是否有同步状态(即同步状态是否为0),如果有同步状态就用CAS去获取(0->1),成功就设置当前线程为获取同步状态的线程
- 如果没有同步状态(即同步状态不为0) ,就查看获取同步状态的线程是否为当前线程,如果是当前线程则说明此次是重入,累加重入次数
- 其他情况说明未获取到同步状态,返回false 后续走AQS流程(构建节点加入AQS)
/** * 尝试获取同步状态 * * @param arg 获取同步状态的数量 * @return */ @Override protected boolean tryAcquire(int arg) { //1.获取同步状态 int state = getState(); //2.如果有同步状态则CAS替换 0->1 if (state == 0) { if (compareAndSetState(state, 1)) { //替换成功 说明获取到同步状态 设置当前获取同步状态线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } } else if (getExclusiveOwnerThread() == Thread.currentThread()) { //3.没有同步状态 查看获取同步资源的线程是否为当前线程 可重入 累加重入次数 setState(state + arg); return true; } //其他情况就是没获取到同步状态 return false; }
在释放同步状态中
只有当同步状态要改成0时才是真正释放,否则情况情况下就是重入扣减次数
/** * 尝试释放同步状态 * * @param arg 释放同步状态的数量 * @return */ @Override protected boolean tryRelease(int arg) { //目标状态 int targetState = getState() - arg; //真正释放锁 if (targetState == 0) { setExclusiveOwnerThread(null); setState(targetState); return true; } //其他情况 扣减状态 setState(targetState); return false; }
使用内部类实现AQS的方法后,我们在自定义同步组件类中去实现Lock接口,并用内部类实现AQS的方法去实现Lock接口的方法
将要获取、释放的同步状态都设置成1,对应响应中断、超时的方法就用AQS中对应的方法即可
public class MySynchronizedComponent implements Lock { public MySynchronizedComponent() { sync = new Sync(); } private Sync sync; @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.new ConditionObject(); } }
实际上我们只需要去实现尝试获取、释放同步状态方法就能够完成自己的同步组件,这就是使用AQS带来的好处
代码案例可以去git仓库获取,放在本文最后
ReentrantLock
ReentrantLock是并发包中提供的可重入锁,它除了能够实现synchronized的功能外还可以响应中断、超时、实现公平锁等,其底层也是通过AQS来实现的
ReentrantLock的功能与synchronized类似,可重入的独占锁,用于保证并发场景下同步操作
使用时需要显示加锁、解锁,常用格式如下:
reentrantLock.lock(); try{ //.... }finally { reentrantLock.unlock(); }
finally中最先去解锁,并且加锁要放在try块的最外层,并保证加锁和try块之间不会抛出异常
加锁不放在try中是因为加锁实现未知可能抛出不受检查unchecked的异常,当加锁抛出异常时,后续finally块解锁也会抛出非法监视器的异常从而导致覆盖
加锁和try块之间如果抛出异常,那么就无法执行解锁了
ReentrantLock除了提供基本的同步功能,还提供响应中断、超时的API,同学们可以私下去查看
熟悉ReentrantLock实现的同学,可能看上面自定义同步组件的代码很熟悉,其实就是参考ReentrantLock非公平锁写的
ReentrantLock中使用内部类Sync来继承AQS,同时内部类NonfairSync和FairSync来继承Sync去实现非公平、公平的获取同步状态
非公平锁尝试获取同步状态 流程类似就不过多描述
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
那公平锁如何来实现获取同步状态呢?
其实看过上篇AQS文章的同学就知道了,在上篇文章中已经说过
只需要在尝试获取同步状态前加上一个条件:队列中是否有前置任务(即在队列中FIFO排队获取)
公平锁也是这么去实现的,前置条件hasQueuedPredecessors
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
ReentrantReadWriteLock
功能与实现
ReentrantReadWriteLock在ReentrantLock功能的基础上,提供读写锁的功能,让锁的粒度更细
在一些读多写少的场景下是允许同时读的,允许多个线程获取,其实想到了AQS的共享式,读锁也就是共享式
在读读的场景下,都是读锁/共享锁,不会进行阻塞
在读写、写读、写写的场景下,都会进行阻塞
比如要获取写锁时,需要等待读锁、写锁都解锁;要获取读锁时,需要等待写锁解锁
ReentrantReadWriteLock 在 ReentrantLock 的基础上增加ReadLock
和WriteLock
分别作为读锁和写锁
实际上读锁就是共享锁、写锁就是独占锁,在实现加锁、解锁的方法时分别调用共享式、独占式的获取、释放同步状态即可
在构造时,读写锁中实际使用的都是同一个AQS
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } //读锁构造 protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //写锁构造 protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
即同步状态会被读写锁共享,那么它们如何查看/修改自己的那部分同步状态呢?
在读写锁中,同步状态被一分为二,高16位的同步状态是读锁的,低16位的同步状态是写锁的
当线程获取写锁时,写状态+1,由于写状态在低位,相当于同步状态+1
当线程获取读锁时,读状态+1,由于读状态在高位,相当于同步状态+(1<<16)
写锁获取
写锁的获取实现在sync.tryAcquire
中 sync可以是公平也可以是非公平,实际上是独占式的获取
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //得到同步状态c int c = getState(); //得到写状态(同步状态低16位 与上 全1) int w = exclusiveCount(c); if (c != 0) { //同步状态不为0,写状态为0,说明读状态不为0,读锁已经被获取,此时获取写锁失败 //同步状态不为0,写状态也不为0,查看当前线程是否是获取写锁的线程,不是的话获取写锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; //只有当前线程获取过写锁才能进入这里 //如果原来的写状态+这次重入的写状态 超过了 同步状态的0~15位 则抛出异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //设置同步状态 因为写状态在低16位所以不用左移 (重入累加) setState(c + acquires); return true; } //同步状态为0 无锁时 //writerShouldBlock在非公平锁下返回false 在公平锁下查看是否有前驱任务 //如果CAS失败则返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //CAS成功则 设置当前线程为获得独占锁(写锁)的线程 setExclusiveOwnerThread(current); return true; }
查看源码可以知道:
- 当有锁时(同步状态不为0情况),如果只有读锁(没有写锁),那么直接失败;如果只有写锁则查看当前线程是否为获取写锁的线程(重入情况)
- 当无锁时进行CAS获取写锁,成功则设置获取写锁的线程,失败则返回
根据源码分析可以知道,写锁允许重入,并且获取写锁时,如果有读锁会被阻塞
写锁释放
写锁的释放实现在sync.tryRelease
中
protected final boolean tryRelease(int releases) { //判断当前线程是不是获取写(独占)锁的线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //新状态 int nextc = getState() - releases; //如果新状态低16位为0(没有写锁)就设置获取写锁的线程为空,然后设置同步状态,再返回 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
释放其实也类似,只有当写状态为0时才是真正释放,其他情况都是扣减重入次数
读锁获取
读锁的获取也就是共享式的获取
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //同步状态 int c = getState(); //exclusiveCount 为获取写锁状态 低16位全与1 //如果有写锁 并且 获取写锁的线程不是当前线程 则失败(说明允许同一线程获取写锁再获取读锁) if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取读状态 (同步状态右移16位) int r = sharedCount(c); //读没被阻塞 没超过最大值 且CAS成功 记录信息 返回成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
在读锁中允许同一线程获取写锁再获取读锁
在某些场景下要先写数据再读数据,比如:
- 获取写锁
- 写数据
- 释放写锁
- 使用(读)数据
这样会导致释放完写锁后,其他线程可以获取写锁,从而导致第四步会出现脏读
正确的用法应该在释放写锁前获取读锁:
- 获取写锁
- 写数据
- 获取读锁
- 释放写锁
- 读数据
这样其他线程获取写锁时因为都读锁会被阻塞,而其他线程需要读时又不会被阻塞
在读多写少的场景,读写锁粒度更细,读读不阻塞,并发性能更好
信号量
功能
信号量用于控制同时访问资源的线程数量
线程访问资源时需要先拿到信号量才能访问,访问完释放信号量,信号量允许同时N个线程获取
下面是控制同时只能有2个线程获取到信号量
//初始化信号量 Semaphore semaphore = new Semaphore(2); //每次只有两个线程能够获取到信号量执行 ExecutorService executor = Executors.newFixedThreadPool(4); for (int i = 0; i < 10; i++) { executor.execute(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"获得资源"); //执行任务 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"释放资源======"); semaphore.release(); } }); } executor.shutdown();
实现
熟悉AQS的同学应该可以猜到信号量其实就是通过共享式实现的
信号量构造时提供初始化信号量的数量,实际上就是初始化同步状态,比如设置2个信号量就是设置同步状态为2;还可以在构造中设置公平、非公平
在获取信号量时,使用响应中断的共享式,在非公平情况下执行nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取同步状态 int available = getState(); //目标同步状态 int remaining = available - acquires; //没有信号量 或 CAS成功 都会返回目标同步状态 为负数时获取失败 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
在获取时实际上就是扣减要获取的信号量,可能多个线程同时获取信号量,使用CAS+失败重试保证原子性,直到没有信号量或CAS成功
在释放信号量时实际就是加上释放的信号量,可能多个线程同时释放信号量,因此释放时使用CAS+失败重试保证原子性
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
CountDownLatch
CountDownLatch 相当于一个计数器,在构造时设置计数数量
功能
调用countDown
方法会对数量进行自减
调用await
方法时,如果还有数量没被扣减完,则会阻塞,直到数量都被扣减完
当一个线程执行N个任务,或者多个线程执行一个任务时,要等待它们执行完再进行下一步操作时,就可以使用CountDownLatch
//初始化10 CountDownLatch countDownLatch = new CountDownLatch(10); //固定线程池 ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 1; i <= 10; i++) { final int index = i; executor.execute(() -> { System.out.println(Thread.currentThread() + "处理任务" + index); //执行任务... //数量-1 countDownLatch.countDown(); }); } //计数量为0时才可以继续执行 countDownLatch.await(); System.out.println("处理完任务"); executor.shutdown();
实现
其实它的实现与信号量类似,也是通过共享式
在构造中设置初始值时,实际上就是在设置同步状态
当执行countDown
扣减数量时,实际上就是在扣减同步状态 ,由于可能多线程同时执行,使用CAS+失败重试保证扣减同步状态成功
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
执行await
时,实际就是判断同步状态是否为0,不是则说明有的线程还未执行完任务,阻塞等待
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
CyclicBarrier
cyclic Barrier 是一个可循环使用的屏障,它常常被用来和countdownlatch作比较
它就像一个屏障,让线程执行完任务后遇到屏障阻塞,直到所有线程都执行完任务(都到达屏障),并且它是可重复使用的
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("所有线程到达屏障后,优先执行构造规定的runnable"); }); Thread t1 = new Thread(() -> { //执行任务 task(cyclicBarrier); }, "t1"); Thread t2 = new Thread(() -> { //执行任务 task(cyclicBarrier); }, "t2"); Thread t3 = new Thread(() -> { //执行任务 task(cyclicBarrier); }, "t3"); t1.start(); t2.start(); t3.start();
task方法中会执行await阻塞直到所有线程到达屏障
private static void task(CyclicBarrier cyclicBarrier) { System.out.println(Thread.currentThread() + "执行任务..."); try { TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); System.out.println("所有线程都执行完, " + Thread.currentThread() + "走出屏障"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }
cyclic barrier会记录需要多少线程到达屏障,并且通过代来达到重复使用
使用reentrant lock 在await中加锁、解锁,每当一个线程到达屏障(执行await时),都会进行自减,如果不为0会阻塞,自减到0时说明所有线程到达屏障,唤醒其他线程,并更新新的代
Exchange
Exchanger用于线程间的协作,可以用来交换变量
Exchanger<String> exchanger = new Exchanger(); new Thread(() -> { String A = "A"; try { //B System.out.println(exchanger.exchange(A)); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); String B = "B"; try { //A String A = exchanger.exchange(B); System.out.println("A=" + A + " B=" + B); } catch (InterruptedException e) { e.printStackTrace(); }
当一个线程先执行exchange时会等待另一个线程执行,等到另一个线程exchange时则唤醒等待的线程
总结
本篇文章围绕前置知识AQS原理,来实现自定义的同步组件,并对并发包中常用同步组件的功能和原理进行说明
继承AQS后,只需要实现尝试获取、释放同步状态等方法就可以自定义同步组件
ReentrantLock 是由AQS实现的独占式可重入锁,初始值同步状态为0;获取锁时,如果是无锁则尝试CAS自增,成功就获取了锁;如果有锁则判断获取锁的线程是不是当前线程,是则说明是可重入锁自增次数;在释放锁时由于可重入的关系,只有自减为0才是真正释放锁
ReentrantLock 还提供响应中断、超时、公平锁的其他功能,公平锁实现只需要加上获取锁的前提:在AQS中FIFO排队,前驱节点为首节点
ReentrantReadWriteLock 提供共享的读锁和独占的写锁,将锁的状况更加细粒度,将同步状态高低16位拆分为读、写的状态,在读多写少的场景并发性能会更好;在获取写锁时,如果有读锁那么会阻塞,如果有写锁会查看是否为可重入;在获取读锁时,没有写锁就可以获取,如果写锁是当前线程也可以获取
信号量用于控制线程访问资源,初始化自定义的信号量数量,线程访问资源时先获取信号量,获取到信号量才能够访问资源;使用共享式来实现,由于可能多个线程同时获取、释放信号量,在实现时都需要使用CAS+失败重试保证原子性
CountDownLatch 用于计数,可以用于一个线程执行N个任务,也可以用于多个线程执行1个任务,当执行完任务使用countdown 来对同步状态进行扣减,执行await方法时只要同步状态不为0就会阻塞线程,直到所有任务执行完(将同步状态扣减完)
CyclicBarrier 是可循环使用的屏障,用于多线程到达屏障后,需要等待其他线程都到达屏障才继续执行;使用reentrant lock 和 代 来实现,调用await时自减,当计数为0时说明所有线程到达屏障,唤醒其他阻塞的线程
Exchange 用于线程间的协作,能够交换线程间的变量
最后(不要白嫖,一键三连求求拉~)
本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 gitee-StudyJava、 github-StudyJava 感兴趣的同学可以stat下持续关注喔~
案例地址:
Gitee-JavaConcurrentProgramming/src/main/java/C_AQSComponent
Github-JavaConcurrentProgramming/src/main/java/C_AQSComponent
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多干货,公众号:菜菜的后端私房菜。