前言
之前我们已经讲解了不少JUC的内容,今天我们要讲解另一个方面们就是JUC提供的并发工具类
一、并发工具类用处
JUC内,并发工具包可以帮助开发者更方便地编写高效稳定的并发程序。其实,如果前面的知识你认真学了,尤其是关于AQS的部分内容,那么这些工具包的功能用户其实可以自行实现,但工具类的提供使得我们省的重复造轮子。
以下是JUC并发工具类的一些用途和说明,后面将会详细讲解
二、CountDownLatch
1. 用途
它可以让某个线程等待一个或多个线程完成操作后再执行,常用于控制一个或多个线程等待其他线程完成操作后再执行某个操作。例如,一个线程需要等待多个子线程全部执行完毕后再进行汇总计算,就可以使用CountDownLatch来实现
2. 示例
使用CountDownLatch阻塞主线程,其他线程执行 countDown() ,最后使得主线程恢复
public class CountdownLatchTest { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(3); // 定义初始度为3,即3次减度后将放开阻塞 final CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("子线程" + Thread.currentThread().getName() + "开始执行"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("子线程"+Thread.currentThread().getName()+"执行完成"); latch.countDown();//当前线程调用此方法,则计数减一 } catch (InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { System.out.println("主线程"+Thread.currentThread().getName()+"等待子线程执行完成..."); latch.await();//阻塞当前线程,直到计数器的值为0 System.out.println("主线程"+Thread.currentThread().getName()+"开始执行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
3. 原理
主要是继承了AQS,其本质是一个互斥锁,只不过不同于一般的互斥锁,一般的互斥锁上锁解锁都是以 1 为单位,所以执行一次释放,就能唤醒后面的线程来竞争锁
而如上诉用例里,CountDownLatch 则一开始就上了一个力度为 3 的锁,这样当主线程使用await的时候实际是在申请锁,且只要当前状态不为0,就阻塞。而每次其他线程把这个数字减一,都会唤醒主线程,当然主线程又会来检测当前状态是否为0。如此反复,直到最后其真的为0,则竞争到锁,得以继续执行。因此,一个这种锁只能用一次。
三、Semaphore
1. 用途
它可以控制同时访问某个资源的线程数量,常用于控制并发请求的流量。例如,一个Web服务器同时只能处理有限的请求数量,就可以使用Semaphore来控制并发请求的数量。
2. 示例
public class SemaphoreDemo { // 可同时受理业务的窗口数量(同时并发执行的线程数) public static int threadTotal = 2; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); for (int i = 0; i < 10; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(1); resolve(count); semaphore.release(1); } catch (Exception e) { log.error("exception", e); } }); } executorService.shutdown(); } private static void resolve(int i) throws InterruptedException { log.info("服务号{},受理业务中。。。", i); Thread.sleep(2000); } }
3. 原理
主要是继承了AQS,其本质是一个共享锁,只不过不同于普通共享锁不做数量限制,本共享锁初始值是一个共享额度,当一个线程申请后,少于额度时则阻塞本线程。当一个线程释放本额度时,则唤醒所有线程,所以这里还可以选择是否使用公平锁
如实例,设置了共享额度为2,则每一个进来的线程会将值减1,当第三个线程进入时,只能阻塞。等到有线程释放额度时,它就能被唤醒,进而竞争这个额度了。
四、CyclicBarrier
1. 用途
它可以让一组线程互相等待,直到所有线程都到达一个屏障点后再一起执行,常用于多线程计算结果的合并。例如,多个分布式计算节点需要将自己的计算结果合并到一起,就可以使用CyclicBarrier来实现。
2. 示例
public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); //周末3人聚会,需要等待3个人全部到齐餐厅后才能开始吃饭 CyclicBarrier cb = new CyclicBarrier(3); System.out.println("初始化:有" + (3 - cb.getNumberWaiting()) + "个人正在赶来餐厅"); for (int i = 0; i < 3; i++) { //设置用户的编号 final int person = i; executor.execute(() -> { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + "---用户" + person + "即将达到餐厅," + "用户" + person + "到达餐厅了。" + "当前已有" + (cb.getNumberWaiting() + 1) + "个人到达餐厅"); cb.await(); System.out.println("三个人都到到餐厅啦," + Thread.currentThread().getName() + "开始吃饭了"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } executor.shutdown(); //关闭线程池 } }
3. 原理
CyclicBarrier相对于其他并发工具类来说,要复杂一点,它没有重写AQS的代码,它的阻塞是也并非像上述两者一样是靠阻塞队列实现线程阻塞,而是依赖条件队列的阻塞:当某个条件满足,则唤醒条件队列里所有的线程。而且它还有 执行额外任务和可重置 的特点,我们看它的构造函数
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // 利用两个数存阈值,一个数每次递减,另一个数不变,方便重置时恢复原值 this.parties = parties; this.count = parties; // 入参包含可运行代码段,达到条件即可执行 this.barrierCommand = barrierAction; }
而其主要代码,我们也稍微讲解下
逻辑其实并不复杂,只是其实现比较不一样罢了
五、总结
- CountDownLatch : 自己一个人阻塞,要等指定的人数帮助后恢复(只能用一次)
- Semaphore : 固定通道大小,只允许指定人数进入
- CyclicBarrier :来一个阻塞一个,最后阻塞到指定人数后,所有阻塞的一起恢复