前言
大家好,我是小郭,上一篇主要使用了CountDownLatch进行并发流程的控制,但是有另外一个工具它也能实现并发的控制,那就是CyclicBarrier,在玩游戏的时候我们经常遇到,我的进度条已经到了100%,但是却一直进不了游戏,这就是因为需要相互等待,等所有人准备完毕,才能开始。
了解CyclicBarrier
CyclicBarrier 也是一种多线程并发控制的实用工具,和 CountDownLatch 一样具有等待计数的功能,但是相比于 CountDownLatch 功能更加强大,CountDownLatch是一次性的。
主要作用使用它进行线程的同步,进行计数循环、重置。
可以理解为,去火锅店吃火锅,吃完一桌走人,再重新来一桌人。
思考问题
- CountDownLatch和CyclicBarrier有什么区别?
- CyclicBarrier适用于什么场景?
- CyclicBarrier为什么可以循环使用?
主要参数与方法
public class CyclicBarrier { /** The lock for guarding barrier entry */ // 可重入锁 private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ // 条件队列 private final Condition trip = lock.newCondition(); /** The number of parties */ // 参与的线程数量 private final int parties; /* The command to run when tripped */ // 由最后一个进入 barrier 的线程执行的操作 private final Runnable barrierCommand; /** The current generation */ // 实例 private Generation generation = new Generation(); // 正在等待进入等待的线程数量 private int count; }
//将线程处于休眠状态,所有的线程到达临界点 public int await() throws InterruptedException, BrokenBarrierException //更新状态,唤醒 all nextGeneration() //是否处于断开状态 boolean isBroken() //获取临界点上的线程数 int getNumberWaiting() //重置为初始状态 reset()
构造方法
public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
核心方法
dowait
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { // 生成一个实例,CyclicBarrier类存在一个内部类Generation, // 每一次使用的CycBarrier可以当成Generation的实例 final Generation g = generation; // broken判断是否为断开,初始化为false if (g.broken) throw new BrokenBarrierException(); // 判断线程是否终止 if (Thread.interrupted()) { // 如果终止,broken设置为true,唤醒所有 breakBarrier(); throw new InterruptedException(); } // 自减一 int index = --count; // 都到达临界点,出发临界点任务 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { // 任务执行完毕,唤醒所有,broken设置为断开 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (; ; ) { try { // timed默认为false,挂起当前任务 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && !g.broken) { // 断开屏障 breakBarrier(); throw ie; } else { // 中断当前线程 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); // 返回当前序号 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
CyclicBarrier为什么可以循环使用?
/** * 更新障碍流程的状态并唤醒所有人。 * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
当count为0的时候,就会生成新的屏障,进行初始化的工作同时唤醒所有人
实践
同样还是利用四个人吃火锅在作为例子。通过调用await相互等待,到了临界点的时候出发临界点的线程任务。
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4,() ->{ System.out.println("人到齐了,开吃!"); }); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); IntStream.range(0,16).forEach(i ->{ executorService.submit(() ->{ System.out.println(Thread.currentThread().getName() + "来吃火锅"); try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "动筷子"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); }); executorService.shutdown(); }
输出结果
pool-1-thread-1来吃火锅 pool-1-thread-4来吃火锅 pool-1-thread-3来吃火锅 pool-1-thread-2来吃火锅 人到齐了,开吃! pool-1-thread-1来吃火锅 pool-1-thread-3来吃火锅 pool-1-thread-4来吃火锅 pool-1-thread-2来吃火锅 人到齐了,开吃!
场景:T1线程查询库存剩余数量,T2线程校验付款金额,T3线程完成扣减库存
下面是伪代码的实现
// 初始化数量和金额 private static final AtomicInteger num = new AtomicInteger(1); private static final Integer money = 2; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,() ->{ int count = num.decrementAndGet(); System.out.println(Thread.currentThread().getName()+ "扣减库存完成,剩余库存" + count ); }); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(1); IntStream.range(0,10).forEach(i ->{ executorService.submit(() ->{ CheckAll(); }); }); executorService.shutdown(); } public static void CheckAll(){ // 查询库存数量 Thread t1 = new Thread(() ->{ if (num.get() > 0){ System.out.println("库存数量充足"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }else { System.out.println("库存不足"); } }); t1.start(); System.out.println(t1.getName() + "启动"); // 查询付款金额 Thread t2 = new Thread(() ->{ if (num.get() * money > 0){ System.out.println("付款金额正确"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }else { System.out.println("付款失败"); } }); t2.start(); System.out.println(t2.getName() + "启动"); }
在这里我们需要注意,线程池大小为1是必要的,如果设置为多个,有可能会两个线程 A 和 B 同时查询,A 的订单先返回,B 的派送单先返回,造成队列中的数据不匹配,所以1个线程实现生产数据串行执行,保证数据安全。
对比一下CountDownLacth与CyclicBarrier
- CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等,一起完成。
- CountDownLacth是一次性的,而CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。
- CyclicBarrier 可以设置回调函数
- 调用 CountDownLatch 的 countDown 方法后,当前线程并不会阻塞,会继续往下执行;而调用 CyclicBarrier 的 await 方法,会阻塞当前线程,直到 CyclicBarrier 指定的线程全部都到达了指定点的时候,才能继续往下执行