前言
CyclicBarrier的字面意思是“可循环使用的屏障”。它允许一组线程互相等待,直到所有线程都到达一个公共的屏障点(或称为同步点)。在这个屏障点上,线程会被阻塞,直到所有参与的线程都到达这个点。一旦所有线程都到达屏障点,屏障就会被打开,允许所有线程继续执行。
这个“循环”的概念意味着,一旦所有线程通过屏障,屏障就会自动重置,可以再次用于下一轮的线程同步。这使得CyclicBarrier非常适合于那些需要多次同步的场景。
一、CyclicBarrier的内部机制
CyclicBarrier的内部实现基于一个计数器和一个条件变量(通常是一个锁和相关的等待/通知机制)。每当一个线程调用await()方法时,它会首先检查计数器的值是否达到了在创建CyclicBarrier时指定的“阈值”(即需要等待的线程数)。如果计数器尚未达到阈值,线程就会被阻塞,并等待其他线程的到来。
当另一个线程也调用await()方法时,计数器的值会增加,并且会再次检查是否达到了阈值。如果达到了阈值,那么所有等待在屏障点的线程都会被唤醒,并继续执行。此时,计数器会被重置为0,屏障进入下一轮的使用。
此外,CyclicBarrier还提供了一个可选的Runnable参数。当所有线程都到达屏障点时,这个Runnable任务会在最后一个到达屏障点的线程中执行。这通常用于进行一些额外的初始化、汇总或清理工作。
需要注意的是,如果某个线程在等待过程中因为中断或异常而退出,那么所有等待在屏障点的线程都将收到一个BrokenBarrierException异常。这是因为屏障已经被“破坏”,无法再保证所有线程都能正常通过。
二、源码分析CyclicBarrier的实现原理
CyclicBarrier
允许一组线程互相等待,直到所有线程都到达某个公共屏障点(barrier point)。为了深入理解其实现原理,我们将结合CyclicBarrier
的源码进行分析。
2.1 主要属性和构造函数
CyclicBarrier
的主要属性包括:
parties
:表示必须调用await()
方法的线程数量,即屏障的阈值。count
:当前已到达屏障的线程数量。barrierCommand
:当所有线程到达屏障时执行的可选任务。generation
:用于标识当前屏障的“代”或循环次数。每当屏障被打破或所有线程通过屏障时,它都会增加。
构造函数允许设置parties
(必须到达的线程数)和可选的barrierAction
(所有线程到达屏障时执行的任务)。
2.2 await()方法
await()
方法是CyclicBarrier
的核心。当线程调用此方法时,它会执行以下步骤:
- 检查是否有线程由于中断或异常而退出,导致屏障处于“破坏”状态。如果是,则抛出
BrokenBarrierException
。
- 如果当前线程不是最后一个到达屏障的线程,则将其放入等待队列中,并可能因等待而被挂起。
- 如果当前线程是最后一个到达屏障的线程,则执行以下操作:
- 如果存在
barrierCommand
,则在当前线程中执行它。
- 唤醒所有等待在屏障上的线程。
- 重置
count
为0,并增加generation
的值,以表示屏障已进入下一个循环。
以下是CyclicBarrier
中await()
方法的一个简化版源码分析(实际源码包含更多的错误处理和优化):
public int await() throws InterruptedException, BrokenBarrierException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // not the last thread to arrive, wait until all others arrive if (!trip.await(this, timeout, unit)) throw new TimeoutException(); // not actually in real code, for simplicity } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // Another thread must have interrupted us; we're about to notify them // and if this was our interrupt, we'll throw it again below Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; // spinning wait for next generation Condition r = generation.register(count = parties - 1); // reset count to parties on each generation change // yield in case we're waiting for other threads while (count == parties - 1) Thread.yield(); // spin-wait // arrive at new generation r.signalAll(); } } finally { lock.unlock(); } } // Helper methods not shown for brevity: breakBarrier(), nextGeneration(), etc.
CyclicBarrier
通过内部锁和条件变量来协调线程的等待和唤醒。- 当线程调用
await()
方法时,它会检查屏障的状态,并根据需要挂起或继续执行。 - 如果所有线程都到达了屏障,则会执行可选的任务,并重置屏障以供下一轮使用。
- 如果线程在等待过程中被中断或出现异常,则屏障可能会被标记为“破坏”状态,导致所有等待的线程都收到异常。
这种机制确保了线程之间的同步和协作能够以一种高效且可靠的方式进行。
二、CyclicBarrier的使用
2.1 CyclicBarrier使用场景
CyclicBarrier
的使用场景非常广泛,特别是在需要将一个大任务拆分成多个小任务,并且这些小任务之间存在依赖关系的场景中。以下是一些具体的使用案例:
- 并行计算流水线:在并行计算中,常常需要将一个大任务拆分成多个阶段,每个阶段由一组线程完成。每个阶段都依赖于前一个阶段的结果。在这种情况下,可以使用
CyclicBarrier
来同步每个阶段的线程,确保它们都完成后再进入下一个阶段。 - 多线程测试:在进行多线程测试时,可能需要创建一组线程来模拟并发用户。为了确保所有线程都准备好后再开始测试,可以使用
CyclicBarrier
来同步它们的状态。 - 资源初始化:在某些情况下,可能需要一组线程共同完成某个资源的初始化工作。使用
CyclicBarrier
可以确保所有线程都完成初始化后再继续执行后续任务。
2.2 CyclicBarrier实现并行计算任务
下面代码中我们将模拟一个简单的并行计算任务,其中几个线程需要等待彼此完成后才能继续执行。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { // 设置屏障的阈值为3,意味着需要3个线程到达屏障后才会继续执行 CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("所有线程都已到达屏障,继续执行后续任务。"); }); // 创建并启动3个线程,每个线程将执行不同的任务并在到达屏障时等待其他线程 for (int i = 0; i < 3; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 开始执行任务..."); try { // 模拟执行任务的时间 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 任务执行完毕,等待其他线程..."); try { // 到达屏障,等待其他线程 cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 通过屏障,可以继续执行后续任务..."); }).start(); } } }
- 我们创建了一个
CyclicBarrier
对象,设置其阈值为3,并提供了一个当所有线程到达屏障时执行的可选任务。 - 然后我们创建了3个线程,每个线程都会执行一些任务,然后调用
cyclicBarrier.await()
方法到达屏障并等待其他线程。 - 当所有3个线程都到达屏障时,屏障的操作将被执行,然后所有线程可以继续执行后续任务。
注意,由于线程调度的不确定性,每个线程打印的消息顺序可能会有所不同,但是你会看到“所有线程都已到达屏障,继续执行后续任务。”这条消息总是在所有线程都到达屏障后打印出来的。这证明了CyclicBarrier
在协调多个线程同步点方面的作用。
三、CyclicBarrier与CountDownLatch的区别与联系
虽然CyclicBarrier
和CountDownLatch
都是用于同步多个线程的工具类,但它们之间存在一些关键的区别和联系:
- 可重用性:
CyclicBarrier
是可循环使用的。一旦所有线程通过屏障,它就会自动重置为初始状态,可以再次用于下一轮的线程同步。而CountDownLatch
是一次性的,一旦计数器减到0,就不能再重用了。 - 计数方式:
CyclicBarrier
的计数器是递增的,直到达到指定的线程数(阈值)。而CountDownLatch
的计数器是递减的,每次调用countDown()
方法都会使计数器减1。 - 使用场景:由于
CyclicBarrier
具有可重用性,它更适合于那些需要多次同步的场景,比如并行计算流水线或多次重复执行的多线程任务。而CountDownLatch
则更适合于那些只需要一次同步的场景,比如等待一组线程完成初始化工作后再继续执行后续任务。 - 异常处理:当某个线程在等待过程中因为中断或异常而退出时,CyclicBarrier和CountDownLatch的处理方式也有所不同。对于CyclicBarrier,所有等待在屏障点的线程都将收到一个BrokenBarrierException异常。而对于CountDownLatch,异常的处理取决于具体的实现和调用方式(比如是否使用了await(long timeout, TimeUnit unit)方法)。
四、总结
CyclicBarrier是Java并发包中提供的一个强大且灵活的同步工具类。它允许一组线程在一个公共的屏障点上互相等待,直到所有线程都到达这个点后再继续执行后续任务。这使得它在处理复杂的多线程同步问题时非常有用。通过深入理解CyclicBarrier的内部机制和使用场景,我们可以更好地利用它来编写高效、可靠且易于维护的并发程序。