深入理解Java并发工具包中的CyclicBarrier

简介: 深入理解Java并发工具包中的CyclicBarrier

前言

CyclicBarrier的字面意思是“可循环使用的屏障”。它允许一组线程互相等待,直到所有线程都到达一个公共的屏障点(或称为同步点)。在这个屏障点上,线程会被阻塞,直到所有参与的线程都到达这个点。一旦所有线程都到达屏障点,屏障就会被打开,允许所有线程继续执行。

这个“循环”的概念意味着,一旦所有线程通过屏障,屏障就会自动重置,可以再次用于下一轮的线程同步。这使得CyclicBarrier非常适合于那些需要多次同步的场景。

一、CyclicBarrier的内部机制

CyclicBarrier的内部实现基于一个计数器和一个条件变量(通常是一个锁和相关的等待/通知机制)。每当一个线程调用await()方法时,它会首先检查计数器的值是否达到了在创建CyclicBarrier时指定的“阈值”(即需要等待的线程数)。如果计数器尚未达到阈值,线程就会被阻塞,并等待其他线程的到来。


当另一个线程也调用await()方法时,计数器的值会增加,并且会再次检查是否达到了阈值。如果达到了阈值,那么所有等待在屏障点的线程都会被唤醒,并继续执行。此时,计数器会被重置为0,屏障进入下一轮的使用。


此外,CyclicBarrier还提供了一个可选的Runnable参数。当所有线程都到达屏障点时,这个Runnable任务会在最后一个到达屏障点的线程中执行。这通常用于进行一些额外的初始化、汇总或清理工作。


需要注意的是,如果某个线程在等待过程中因为中断或异常而退出,那么所有等待在屏障点的线程都将收到一个BrokenBarrierException异常。这是因为屏障已经被“破坏”,无法再保证所有线程都能正常通过。

二、源码分析CyclicBarrier的实现原理

CyclicBarrier允许一组线程互相等待,直到所有线程都到达某个公共屏障点(barrier point)。为了深入理解其实现原理,我们将结合CyclicBarrier的源码进行分析。

2.1 主要属性和构造函数

CyclicBarrier的主要属性包括:

  • parties:表示必须调用await()方法的线程数量,即屏障的阈值。
  • count:当前已到达屏障的线程数量。
  • barrierCommand:当所有线程到达屏障时执行的可选任务。
  • generation:用于标识当前屏障的“代”或循环次数。每当屏障被打破或所有线程通过屏障时,它都会增加。

构造函数允许设置parties(必须到达的线程数)和可选的barrierAction(所有线程到达屏障时执行的任务)。

2.2 await()方法

await()方法是CyclicBarrier的核心。当线程调用此方法时,它会执行以下步骤:

  1. 检查是否有线程由于中断或异常而退出,导致屏障处于“破坏”状态。如果是,则抛出BrokenBarrierException
  1. 如果当前线程不是最后一个到达屏障的线程,则将其放入等待队列中,并可能因等待而被挂起。
  2. 如果当前线程是最后一个到达屏障的线程,则执行以下操作:
  • 如果存在barrierCommand,则在当前线程中执行它。
  • 唤醒所有等待在屏障上的线程。
  • 重置count为0,并增加generation的值,以表示屏障已进入下一个循环。

以下是CyclicBarrierawait()方法的一个简化版源码分析(实际源码包含更多的错误处理和优化):

public int await() throws InterruptedException, BrokenBarrierException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                // not the last thread to arrive, wait until all others arrive
                if (!trip.await(this, timeout, unit))
                    throw new TimeoutException(); // not actually in real code, for simplicity
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // Another thread must have interrupted us; we're about to notify them
                    // and if this was our interrupt, we'll throw it again below
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            // spinning wait for next generation
            Condition r = generation.register(count = parties - 1);
            // reset count to parties on each generation change
            // yield in case we're waiting for other threads
            while (count == parties - 1)
                Thread.yield(); // spin-wait
            // arrive at new generation
            r.signalAll();
        }
    } finally {
        lock.unlock();
    }
}

// Helper methods not shown for brevity: breakBarrier(), nextGeneration(), etc.
  • CyclicBarrier通过内部锁和条件变量来协调线程的等待和唤醒。
  • 当线程调用await()方法时,它会检查屏障的状态,并根据需要挂起或继续执行。
  • 如果所有线程都到达了屏障,则会执行可选的任务,并重置屏障以供下一轮使用。
  • 如果线程在等待过程中被中断或出现异常,则屏障可能会被标记为“破坏”状态,导致所有等待的线程都收到异常。

这种机制确保了线程之间的同步和协作能够以一种高效且可靠的方式进行。

二、CyclicBarrier的使用

2.1 CyclicBarrier使用场景

CyclicBarrier的使用场景非常广泛,特别是在需要将一个大任务拆分成多个小任务,并且这些小任务之间存在依赖关系的场景中。以下是一些具体的使用案例:

  1. 并行计算流水线:在并行计算中,常常需要将一个大任务拆分成多个阶段,每个阶段由一组线程完成。每个阶段都依赖于前一个阶段的结果。在这种情况下,可以使用CyclicBarrier来同步每个阶段的线程,确保它们都完成后再进入下一个阶段。
  2. 多线程测试:在进行多线程测试时,可能需要创建一组线程来模拟并发用户。为了确保所有线程都准备好后再开始测试,可以使用CyclicBarrier来同步它们的状态。
  3. 资源初始化:在某些情况下,可能需要一组线程共同完成某个资源的初始化工作。使用CyclicBarrier可以确保所有线程都完成初始化后再继续执行后续任务。

2.2 CyclicBarrier实现并行计算任务

下面代码中我们将模拟一个简单的并行计算任务,其中几个线程需要等待彼此完成后才能继续执行。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {

    public static void main(String[] args) {
        // 设置屏障的阈值为3,意味着需要3个线程到达屏障后才会继续执行
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("所有线程都已到达屏障,继续执行后续任务。");
        });

        // 创建并启动3个线程,每个线程将执行不同的任务并在到达屏障时等待其他线程
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 开始执行任务...");
                try {
                    // 模拟执行任务的时间
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 任务执行完毕,等待其他线程...");
                try {
                    // 到达屏障,等待其他线程
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 通过屏障,可以继续执行后续任务...");
            }).start();
        }
    }
}
  • 我们创建了一个CyclicBarrier对象,设置其阈值为3,并提供了一个当所有线程到达屏障时执行的可选任务。
  • 然后我们创建了3个线程,每个线程都会执行一些任务,然后调用cyclicBarrier.await()方法到达屏障并等待其他线程。
  • 当所有3个线程都到达屏障时,屏障的操作将被执行,然后所有线程可以继续执行后续任务。

注意,由于线程调度的不确定性,每个线程打印的消息顺序可能会有所不同,但是你会看到“所有线程都已到达屏障,继续执行后续任务。”这条消息总是在所有线程都到达屏障后打印出来的。这证明了CyclicBarrier在协调多个线程同步点方面的作用。

三、CyclicBarrier与CountDownLatch的区别与联系

虽然CyclicBarrierCountDownLatch都是用于同步多个线程的工具类,但它们之间存在一些关键的区别和联系:

  1. 可重用性CyclicBarrier是可循环使用的。一旦所有线程通过屏障,它就会自动重置为初始状态,可以再次用于下一轮的线程同步。而CountDownLatch是一次性的,一旦计数器减到0,就不能再重用了。
  2. 计数方式CyclicBarrier的计数器是递增的,直到达到指定的线程数(阈值)。而CountDownLatch的计数器是递减的,每次调用countDown()方法都会使计数器减1。
  3. 使用场景:由于CyclicBarrier具有可重用性,它更适合于那些需要多次同步的场景,比如并行计算流水线或多次重复执行的多线程任务。而CountDownLatch则更适合于那些只需要一次同步的场景,比如等待一组线程完成初始化工作后再继续执行后续任务。
  4. 异常处理:当某个线程在等待过程中因为中断或异常而退出时,CyclicBarrier和CountDownLatch的处理方式也有所不同。对于CyclicBarrier,所有等待在屏障点的线程都将收到一个BrokenBarrierException异常。而对于CountDownLatch,异常的处理取决于具体的实现和调用方式(比如是否使用了await(long timeout, TimeUnit unit)方法)。

四、总结

CyclicBarrier是Java并发包中提供的一个强大且灵活的同步工具类。它允许一组线程在一个公共的屏障点上互相等待,直到所有线程都到达这个点后再继续执行后续任务。这使得它在处理复杂的多线程同步问题时非常有用。通过深入理解CyclicBarrier的内部机制和使用场景,我们可以更好地利用它来编写高效、可靠且易于维护的并发程序。

相关文章
|
4月前
|
安全 Java 编译器
揭秘JAVA深渊:那些让你头大的最晦涩知识点,从泛型迷思到并发陷阱,你敢挑战吗?
【8月更文挑战第22天】Java中的难点常隐藏在其高级特性中,如泛型与类型擦除、并发编程中的内存可见性及指令重排,以及反射与动态代理等。这些特性虽强大却也晦涩,要求开发者深入理解JVM运作机制及计算机底层细节。例如,泛型在编译时检查类型以增强安全性,但在运行时因类型擦除而丢失类型信息,可能导致类型安全问题。并发编程中,内存可见性和指令重排对同步机制提出更高要求,不当处理会导致数据不一致。反射与动态代理虽提供运行时行为定制能力,但也增加了复杂度和性能开销。掌握这些知识需深厚的技术底蕴和实践经验。
86 2
|
24天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
29天前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
48 2
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
29 1
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
3月前
|
Java
JAVA并发编程系列(9)CyclicBarrier循环屏障原理分析
本文介绍了拼多多面试中的模拟拼团问题,通过使用 `CyclicBarrier` 实现了多人拼团成功后提交订单并支付的功能。与之前的 `CountDownLatch` 方法不同,`CyclicBarrier` 能够确保所有线程到达屏障点后继续执行,并且屏障可重复使用。文章详细解析了 `CyclicBarrier` 的核心原理及使用方法,并通过代码示例展示了其工作流程。最后,文章还提供了 `CyclicBarrier` 的源码分析,帮助读者深入理解其实现机制。
|
4月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
56 5
|
4月前
|
存储 算法 Java
Java 中的同步集合和并发集合
【8月更文挑战第22天】
47 5
|
4月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
56 2
|
4月前
|
Java
Java 中 CyclicBarrier 和 CountDownLatch 的区别
【8月更文挑战第22天】
117 4