简介
CyclicBarrier(可重用屏障/栅栏) 类似于 CountDownLatch(倒计数闭锁),它能阻塞一组线程直到某个事件的发生。
与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
闭锁用于等待事件,而屏障用于等待其他线程。
CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用 await() 方法,这个方法将阻塞直到所有线程都到达屏障位置。如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。
CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。
所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。
CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用 CyclicBarrier 很有帮助。
这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以 重新使用 的。
CyclicBarrier允许一组线程相互等待,直到到达某个公共的屏障点,通过CyclicBarrier,可以实现多个线程间相互等待,直到所有的线程都准备好,等待条件可以重用,又称为循环屏障,可以用于多线程计算数据,最终汇总计算结果的场景。
应用场景
CyclicBarrier 常用于多线程分组计算。
比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择 CyclicBarrier。
CountDownLatch和CyclicBarrier区别
CountDownLatch 是一个线程(或者多个),等待另外 N 个线程完成某个事情之后才能执行;CyclicBarrier 是 N 个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用 reset() 方法重置;CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
CountDownLatch 采用减计数方式;CyclicBarrier 采用加计数方式。
CyclicBarrier 原理
CyclicBarrier 内部使用了 ReentrantLock 和 Condition 两个类。
案例一
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } } // 输出 21:36:05.124 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 is ready 21:36:06.123 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 is ready 21:36:07.123 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 is ready 21:36:08.123 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 is ready 21:36:09.124 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 is ready 21:36:09.124 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 continue 21:36:09.124 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 continue 21:36:09.124 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 continue 21:36:09.124 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 continue 21:36:09.124 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 continue 21:36:10.124 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 is ready 21:36:11.125 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 is ready 21:36:12.125 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 is ready 21:36:13.126 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 is ready 21:36:14.127 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 is ready 21:36:14.127 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 continue 21:36:14.127 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 continue 21:36:14.127 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 continue 21:36:14.127 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 continue 21:36:14.127 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 continue Process finished with exit code 0
分析 await 方法
在 CyclicBarrier 上进行阻塞等待,直到发生以下情形之一。
在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
线程调用 await() 表示自己已经到达栅栏。
BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程
await() 时被中断或者超时。