内容摘要
CyclicBarrier的优点在于实现了线程间的相互等待与协同,确保所有线程在达到预定屏障点后才能继续执行,它支持屏障的重复使用,非常适合多轮次的任务同步,此外,CyclicBarrier还允许在屏障点执行特定操作,为复杂的多线程协作提供了便利。
核心概念
业务场景
CyclicBarrier
允许一组线程互相等待,直到所有线程都到达某个屏障(barrier)点,然后这些线程可以继续执行后续的任务,这个屏障是可以循环使用的,也就是说,当所有线程都达到屏障点后,屏障会自动重置,等待下一轮的线程到来。
举一个实际业务中的例子:假设有一个大型电商公司,在每年的“双十一”大促期间,都需要进行大量的商品数据预处理工作,以应对即将到来的购物高峰。这个预处理工作包括很多步骤,比如商品信息的校验、库存的更新、价格的调整等等,由于数据量巨大,公司决定采用多线程的方式来加速处理过程。
公司可以将整个预处理任务划分为多个子任务,每个子任务由一个独立的线程来完成,但是,这些子任务之间存在一定的依赖关系,比如某个子任务需要等待其他子任务完成后才能开始执行。
在这类场景中可以使用CyclicBarrier
实现多个线程之间的等待,可以将每个子任务的结束点设置为一个屏障点,当所有子任务都完成并达到这个屏障点时,说明整个预处理工作的第一阶段已经完成,可以开始第二阶段的任务了,然后,CyclicBarrier
会自动重置,等待下一轮的线程到来。
这种方式以确保每个阶段的任务都按照预定的顺序执行,同时充分利用多线程的优势,提高处理效率。
技术场景
CyclicBarrier
位于java.util.concurrent
包中,通常用来解决以下几类技术方面的问题:
- 线程同步:当多个线程需要同时进行某些操作,而这些操作需要在所有线程都准备好之后才能开始时,
CyclicBarrier
可以用来同步这些线程,它可以让一组线程在某个点上等待,直到所有线程都达到这个点,然后这些线程才可以继续执行。 - 资源分解与任务划分:在处理大量数据或执行复杂任务时,通常会将任务分解成多个子任务,由不同的线程并行处理,
CyclicBarrier
可以确保在所有子任务完成之前,不会有线程提前进入下一个处理阶段,从而保证了数据的一致性和任务的顺序性。 - 循环使用:与
CountDownLatch
不同,CyclicBarrier
是可以重复使用的,一旦所有线程都达到了屏障点,屏障会自动重置,这样就可以用于多轮的任务同步。 - 异常处理:
CyclicBarrier
还提供了一个特性,即当线程在屏障点等待时,如果某个线程因为异常而中断,那么它可以传播这个异常给其他正在等待的线程,这样可以让所有线程都对异常情况作出响应。 - 线程间的协作:在某些场景中,线程之间需要紧密协作,比如生产者-消费者模式中的多个消费者线程需要等待所有生产者线程完成生产后才能开始消费,
CyclicBarrier
可以提供一个集中的同步点,简化线程间的协作逻辑。
官方文档:https://docx.iamqiang.com/jdk11/api/java.base/java/util/concurrent/CyclicBarrier.html
代码案例
下面是一个使用CyclicBarrier
的简单示例代码,模拟了一个多线程任务,其中每个线程代表一个工人,他们需要完成各自的工作部分,然后在一个屏障点等待其他工人完成工作,一旦所有工人都完成了工作,他们将一起进行下一个阶段的工作,如下代码案例:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
final int totalWorker = 5; // 工人总数
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalWorker); // 创建一个CyclicBarrier实例,并指定总工作线程数
for (int i = 0; i < totalWorker; i++) {
new Thread(() -> {
System.out.println("工人" + Thread.currentThread().getId() + "已准备就绪");
try {
// 线程在此等待,直到所有线程都达到这个屏障点
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("工人" + Thread.currentThread().getId() + "开始下一阶段的工作");
}).start();
}
}
}
在上面代码中,CyclicBarrier
被设置为需要等待5个线程(工人)全部就绪,每个线程启动后,都会打印出一条消息表示它已经准备就绪,然后调用cyclicBarrier.await()
方法进入等待状态,只有当所有的线程都调用了await()
方法后,它们才会继续执行,并打印出下一条消息表示开始下一阶段的工作。
如下输出内容:
工人1已准备就绪
工人10已准备就绪
工人9已准备就绪
工人8已准备就绪
工人11已准备就绪
工人8开始下一阶段的工作
工人10开始下一阶段的工作
工人9开始下一阶段的工作
工人11开始下一阶段的工作
工人1开始下一阶段的工作
核心API
下面是CyclicBarrier
中一些主要方法的含义:
CyclicBarrier(int parties)
,构造方法,创建一个新的CyclicBarrier
实例,并设置需要等待的线程数(即参与方数量),parties
表示需要等待的线程数,当这么多线程调用await()
方法后,屏障才会打开,允许线程继续执行。CyclicBarrier(int parties, Runnable barrierAction)
,构造方法,除了设置需要等待的线程数外,还指定了一个当所有线程都达到屏障点时执行的任务(即屏障操作),barrierAction
是一个Runnable
对象,它的run()
方法会在所有线程都到达屏障点后被一个线程调用,barrierAction
只会在当前屏障点运行一次,如果屏障被重置,下次所有线程到达时不会再次执行该操作。int await() throws InterruptedException, BrokenBarrierException
,此方法用于让当前线程在屏障点等待,直到所有线程都达到这个屏障点,如果当前线程不是最后一个到达屏障点的线程,那么它会被阻塞,直到所有线程都到达,如果当前线程是最后一个到达的,并且构造方法中指定了barrierAction
,那么该操作会由当前线程或另一个线程执行(具体取决于实现),如果在等待过程中线程被中断,或者屏障被其他线程破坏(通过调用reset()
方法),那么此方法会抛出异常,返回值是到达屏障点的当前线程的到达顺序,但是这个特性在实际应用中很少使用。int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
,这个方法与上一个await()
方法类似,但是允许指定一个最大等待时间,如果在指定的时间内所有线程都到达了屏障点,那么行为与await()
相同,如果超过了指定的时间还没有所有线程到达,那么这个方法会抛出TimeoutException
。int getParties()
,返回在CyclicBarrier
中需要等待的线程数。int getNumberWaiting()
,返回当前在屏障点等待的线程数。boolean isBroken()
,如果屏障被破坏(可能是因为某个线程在等待时被中断,或者调用了breakBarrier()
方法),那么这个方法返回true
。void reset()
,将屏障重置为初始状态。这会导致所有当前在屏障点等待的线程抛出BrokenBarrierException
,并且屏障可以被重新使用。
注意:CyclicBarrier
是用来让固定数量的线程互相等待的,而不是用来同步访问共享资源的,对于共享资源的同步访问,应该使用其他同步工具,比如synchronized
关键字、Lock
接口的实现(如ReentrantLock
),或者并发集合等。
核心总结
CyclicBarrier是Java中的一个并发工具类,它允许一组线程互相等待,直到所有线程都达到某个屏障点,然后这些线程才能继续执行。
优点:
- 它可以重复使用,非常适合多轮任务同步。
- 提供了线程间的协作机制,确保任务分阶段完成。
- 可以指定屏障点操作,当所有线程到达时自动执行。
缺点:
- 如果线程在等待时被中断或取消,可能会导致BrokenBarrierException。
- 不适合用于同步访问共享资源,更多是用于任务划分和同步点控制。
END!
END!
END!
往期回顾
精品文章
Java并发基础:CopyOnWriteArraySet全面解析
Java并发基础:ConcurrentSkipListMap全面解析
Java并发基础:ConcurrentSkipListSet全面解析!