一、 CountDownLatch
这是个在平时开发中出现频率较高的并发工具,它是一个 倒计数器
。是一个非常实用的多线程控制工具类,这个工具类常常用来控制线程等待,可以让一个线程等待直到计数器结束再开始执行!
我们不必一开始就深究源码,先会用再善用。因此我们简单看个简易的例子
《一个都不能少》
小王老师是一个严格的老师,她上课有些许任性,必须等到所有学生(10名)都到场后才会开始上课,也就是但凡一个学生不在场,都不会开课。
我们要遵循 一个都不能少 的要求,也就是当学生人数 < 总人数的时候不能执行上课的这个动作。那么这个时候我们应该怎么处理这个问题呢?
我们课前点名,增加一个 if
判断,当人数不满足的情况下,就不会进入到 上课 的动作中。这个可能是一个惯性思维,大部分同学都会这样操作。那么问题来了,有些学生可能只是因为迟到,错过了点名的判断,当 if 执行结束后就不会再判断,那么错过就是错过,尽管后续人数已经到齐了,但最终是开不了课的!
想想再改进下,如果因为 if 只判断一次而造成的问题,那我们能不能一直判断,那就可以用到了while
或者 for
一直循环判断。解决思路是正确的,那我们就顺藤引出 CountDownLatch 的用法
代码不长,但不知道结果是否如我们所愿:
我们可以看到,当10
名学生都达到后,小王老师开始上课了,但如果我们这是一个学生没到达呢?
当达到人数未符合预期,则不能正常上课,目前看已经满足我们的需求了。那我们紧接着模拟一下学生迟到的场景~
依然是学号为 10 的同学,虽迟但到,课还是可以正常进行上的!
看来 CountDownLatch 真是一个好工具,简简单单就帮我们解决了该问题!那他怎么解决的呢?
CountDownLatch 是通过一个计数器来实现的,首先设置一个计数器的初始值。每当完成一个任务后,计数器的值就会减1,当计数器达到0
时,它表示所有任务都已经完成,然后在闭锁上等待的线程可以恢复执行任务.
我们首先可以要看的是 CountDownLatch 的构造方法
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
该方法需要初始化一个计数值,并初始化一个 Sync
, 我们这个时候不妨大胆猜测,CountDownLatch
底层便是靠 Sync
实现的!我们来看看 Sync 是个啥玩意?
可以看到在Sync内部维护着一个安全变量 state
,它的值便是 计数器的值。其中有两个重要方法:tryAcquireShared(int acquires)
和 tryReleaseShared(int releases)
。那这两个方法有什么用呢?
我们可以先回到 CountDownLatch 类中,上面我们已经看到该类构造函数的作用,接下来需要认识其中两个重要的方法:countDown()
和 await()
。在我们看来,countDown()
方法便是用来将计数值减 1, await()
方法是用来阻塞判断计数值是否为 0?那我们进入对应方法看是如何实现的
public void countDown() { sync.releaseShared(1); } --- public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
这两个方法调用的都是 AQS
中的两个方法:(我这边直接贴源码注释,仔细看哦~!)
countDown()
await()
上面便是 CountDownLatch 的实现,那我们不妨想想该工具类在实时系统中的使用场景:
- 实现最大的并行性
当我们想要同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类,如果我们创建一个初始值为 1 的CountDownLatch,并让所有线程都在这个锁上等待,那么我们就可以很轻松的完成测试,只需要调用一次 **countDown()**方法就可以让所有等待线程同时恢复执行
- 开始执行前等待 n 个线程完成各自的任务
当我们应用程序执行前,确保某些前置动作需要执行
- 死锁检测
我们可以使用 n 个线程访问共享资源,在每次测试阶段的线程数目是不同的,这样可以尝试产生死锁
二、CyclicBarrier
CyclicBarrier
是另外一种多线程并发控制工具。Cyclic 意为循环,也就是说这个计数器可以反复使用,它比CountDownLatch
更加强大一点,它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程达到屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。
也就是说 CyclicBarrier 是加法计时器,我们一样通过以上 《一个都不能少》
例子来示例如何使用
这里就不再演示缺课与迟到的示范,与上述 CountDownLatch 实现方式一致
这里我们依然关注两个方法,一个是构造方法
,一个是 await()
我们依然进入到 CyclicBarrier 类中查看构造方法
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
可以发现和上面说到的 CountDownLatch 还是有出入的,该构造方法只是做了屏障点
的记录,我们重点还是要看 await()
方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
追根朔底我们得看 dowait()
方法,进入方法可以发现实现方式并不复杂。由于代码有点长,我们截取重点说明
与 CountDownLatch 不同的是,屏障点变量并没有使用 volatile
修饰,那么就得毋庸就得加锁使之线程安全!
以上便是 CyclicBarrier 的整个实现过程,具体咱就不抠细节了~!
CyclicBarrier 和 CountDownLatch 还是有点类似的,但是我们要清楚他们之间的区别:
- CountDownLatch: 一个线程(或多个),等待另外 N 个线程完成某件事情之后才会执行
- CyclicBarrier: N 个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待
比较重要的一点:CountDownLatch 不可重复利用,CyclicBarrier 不可重复利用
三、Semaphore
信号量(Semaphore)是为多线程提供了更为强大的控制方法。从广义上来讲,信号量是对锁的扩展。无论是内部锁synchronized
还是重入锁ReentrantLock
,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个共享资源。
我们简单看个简易的例子
《抢车位》
原本一个小区有 5 个地上停车位已经可以很好的满足业主的停车需求,但是这两年车辆数暴增,几乎家家一车,车位自然供不应求,那只能遵循先到先得的原则!
然后我们看下执行结果:
可以看到 5 个车位是共享资源,只有先到的业主才能抢到车位,当抢到车位的业主离开后,后续的业主才能进入获取到车位!
我们提取出关注点构造方法
、acquire()
、release()
构造方法
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
是的,Semaphore 有两个构造方法,区别在于是否使用公平锁
。然后我们继续看 aquire()
、release()
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void release() { sync.releaseShared(1); }
excuse me~? 前面有认真看的小伙伴,肯定觉得眼熟了,这调用的方法岂不是和上面 CountDownLatch
的一样?是的,这两个并发工具类,底层都是 调用 AQS
的线程方法。如果不知道这两个方法作用的同学,可以上翻查看,这里不再赘述!
根据这个工具类结合上述例子,我们可以在流量控制
的时候使用!特别是公共资源有限的应用场景,比如数据库连接,假如有一个需求要读取几万个文件的数据,因为都是 IO 密集型的任务,我们可以启动几十个线程去并发地读取,但是我们得经过硬盘->内存->数据库
,而如果数据库的连接数只有10个,那我们这个时候就必须要控制只有 10 个线程可以同时获取数据库连接保存数据,这个时候就可以使用 Semaphore
来做流量控制~!
四、Exchanger
看到这个名称,不知道有多少小伙伴脑子里想的是 这是啥?
。实话说,这个工具类出镜率真不高,用的比较少。Exchanger 是一个用于线程间协作的工具类。它可用于线程间的数据交换,它提供了一个同步点,两个线程可以交换彼此的数据,。这两个线程通过 Exchanger 方法交换数据,如果第一个线程先执行 exchange()
方法, 它会一直等待第二个线程也执行 exchanger()
方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
这里注意的是 两个线程
,不存在**"三角关系"**
在没有经过 exchange()
时,数字线程 打印的应该是数字,字母线程打印的应该是字母,但是经过了 exchange()
结果就发生了逆转:
注意:
如果两个线程中有一个没有执行 exchange() 方法,那么则会一直等待
为了避免这种情况的发生,我们可以在 exchange()
中加上超时时间!
那么这个工具类有什么应用场景呢?我们想想如果在一个线程的执行任务中创建某个对象的生产代价很高,而另外一个线程任务也需要消费到这个对象,那我们就可以借助 Exchanger
来帮助我们传输类对象。甚至于可以实现 生产者-消费者
模式!
以上便是几种并发工具类的使用与应用场景,当然上面提到的应用场景只是一小部分,更多的当然需要在开发中继续挖掘,做到会用且善用