深入理解Java并发工具包中的CyclicBarrier

简介: 深入理解Java并发工具包中的CyclicBarrier

前言

CyclicBarrier的字面意思是“可循环使用的屏障”。它允许一组线程互相等待,直到所有线程都到达一个公共的屏障点(或称为同步点)。在这个屏障点上,线程会被阻塞,直到所有参与的线程都到达这个点。一旦所有线程都到达屏障点,屏障就会被打开,允许所有线程继续执行。

这个“循环”的概念意味着,一旦所有线程通过屏障,屏障就会自动重置,可以再次用于下一轮的线程同步。这使得CyclicBarrier非常适合于那些需要多次同步的场景。

一、CyclicBarrier的内部机制

CyclicBarrier的内部实现基于一个计数器和一个条件变量(通常是一个锁和相关的等待/通知机制)。每当一个线程调用await()方法时,它会首先检查计数器的值是否达到了在创建CyclicBarrier时指定的“阈值”(即需要等待的线程数)。如果计数器尚未达到阈值,线程就会被阻塞,并等待其他线程的到来。


当另一个线程也调用await()方法时,计数器的值会增加,并且会再次检查是否达到了阈值。如果达到了阈值,那么所有等待在屏障点的线程都会被唤醒,并继续执行。此时,计数器会被重置为0,屏障进入下一轮的使用。


此外,CyclicBarrier还提供了一个可选的Runnable参数。当所有线程都到达屏障点时,这个Runnable任务会在最后一个到达屏障点的线程中执行。这通常用于进行一些额外的初始化、汇总或清理工作。


需要注意的是,如果某个线程在等待过程中因为中断或异常而退出,那么所有等待在屏障点的线程都将收到一个BrokenBarrierException异常。这是因为屏障已经被“破坏”,无法再保证所有线程都能正常通过。

二、源码分析CyclicBarrier的实现原理

CyclicBarrier允许一组线程互相等待,直到所有线程都到达某个公共屏障点(barrier point)。为了深入理解其实现原理,我们将结合CyclicBarrier的源码进行分析。

2.1 主要属性和构造函数

CyclicBarrier的主要属性包括:

  • parties:表示必须调用await()方法的线程数量,即屏障的阈值。
  • count:当前已到达屏障的线程数量。
  • barrierCommand:当所有线程到达屏障时执行的可选任务。
  • generation:用于标识当前屏障的“代”或循环次数。每当屏障被打破或所有线程通过屏障时,它都会增加。

构造函数允许设置parties(必须到达的线程数)和可选的barrierAction(所有线程到达屏障时执行的任务)。

2.2 await()方法

await()方法是CyclicBarrier的核心。当线程调用此方法时,它会执行以下步骤:

  1. 检查是否有线程由于中断或异常而退出,导致屏障处于“破坏”状态。如果是,则抛出BrokenBarrierException
  1. 如果当前线程不是最后一个到达屏障的线程,则将其放入等待队列中,并可能因等待而被挂起。
  2. 如果当前线程是最后一个到达屏障的线程,则执行以下操作:
  • 如果存在barrierCommand,则在当前线程中执行它。
  • 唤醒所有等待在屏障上的线程。
  • 重置count为0,并增加generation的值,以表示屏障已进入下一个循环。

以下是CyclicBarrierawait()方法的一个简化版源码分析(实际源码包含更多的错误处理和优化):

public int await() throws InterruptedException, BrokenBarrierException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                // not the last thread to arrive, wait until all others arrive
                if (!trip.await(this, timeout, unit))
                    throw new TimeoutException(); // not actually in real code, for simplicity
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // Another thread must have interrupted us; we're about to notify them
                    // and if this was our interrupt, we'll throw it again below
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            // spinning wait for next generation
            Condition r = generation.register(count = parties - 1);
            // reset count to parties on each generation change
            // yield in case we're waiting for other threads
            while (count == parties - 1)
                Thread.yield(); // spin-wait
            // arrive at new generation
            r.signalAll();
        }
    } finally {
        lock.unlock();
    }
}

// Helper methods not shown for brevity: breakBarrier(), nextGeneration(), etc.
  • CyclicBarrier通过内部锁和条件变量来协调线程的等待和唤醒。
  • 当线程调用await()方法时,它会检查屏障的状态,并根据需要挂起或继续执行。
  • 如果所有线程都到达了屏障,则会执行可选的任务,并重置屏障以供下一轮使用。
  • 如果线程在等待过程中被中断或出现异常,则屏障可能会被标记为“破坏”状态,导致所有等待的线程都收到异常。

这种机制确保了线程之间的同步和协作能够以一种高效且可靠的方式进行。

二、CyclicBarrier的使用

2.1 CyclicBarrier使用场景

CyclicBarrier的使用场景非常广泛,特别是在需要将一个大任务拆分成多个小任务,并且这些小任务之间存在依赖关系的场景中。以下是一些具体的使用案例:

  1. 并行计算流水线:在并行计算中,常常需要将一个大任务拆分成多个阶段,每个阶段由一组线程完成。每个阶段都依赖于前一个阶段的结果。在这种情况下,可以使用CyclicBarrier来同步每个阶段的线程,确保它们都完成后再进入下一个阶段。
  2. 多线程测试:在进行多线程测试时,可能需要创建一组线程来模拟并发用户。为了确保所有线程都准备好后再开始测试,可以使用CyclicBarrier来同步它们的状态。
  3. 资源初始化:在某些情况下,可能需要一组线程共同完成某个资源的初始化工作。使用CyclicBarrier可以确保所有线程都完成初始化后再继续执行后续任务。

2.2 CyclicBarrier实现并行计算任务

下面代码中我们将模拟一个简单的并行计算任务,其中几个线程需要等待彼此完成后才能继续执行。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {

    public static void main(String[] args) {
        // 设置屏障的阈值为3,意味着需要3个线程到达屏障后才会继续执行
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("所有线程都已到达屏障,继续执行后续任务。");
        });

        // 创建并启动3个线程,每个线程将执行不同的任务并在到达屏障时等待其他线程
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 开始执行任务...");
                try {
                    // 模拟执行任务的时间
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 任务执行完毕,等待其他线程...");
                try {
                    // 到达屏障,等待其他线程
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 通过屏障,可以继续执行后续任务...");
            }).start();
        }
    }
}
  • 我们创建了一个CyclicBarrier对象,设置其阈值为3,并提供了一个当所有线程到达屏障时执行的可选任务。
  • 然后我们创建了3个线程,每个线程都会执行一些任务,然后调用cyclicBarrier.await()方法到达屏障并等待其他线程。
  • 当所有3个线程都到达屏障时,屏障的操作将被执行,然后所有线程可以继续执行后续任务。

注意,由于线程调度的不确定性,每个线程打印的消息顺序可能会有所不同,但是你会看到“所有线程都已到达屏障,继续执行后续任务。”这条消息总是在所有线程都到达屏障后打印出来的。这证明了CyclicBarrier在协调多个线程同步点方面的作用。

三、CyclicBarrier与CountDownLatch的区别与联系

虽然CyclicBarrierCountDownLatch都是用于同步多个线程的工具类,但它们之间存在一些关键的区别和联系:

  1. 可重用性CyclicBarrier是可循环使用的。一旦所有线程通过屏障,它就会自动重置为初始状态,可以再次用于下一轮的线程同步。而CountDownLatch是一次性的,一旦计数器减到0,就不能再重用了。
  2. 计数方式CyclicBarrier的计数器是递增的,直到达到指定的线程数(阈值)。而CountDownLatch的计数器是递减的,每次调用countDown()方法都会使计数器减1。
  3. 使用场景:由于CyclicBarrier具有可重用性,它更适合于那些需要多次同步的场景,比如并行计算流水线或多次重复执行的多线程任务。而CountDownLatch则更适合于那些只需要一次同步的场景,比如等待一组线程完成初始化工作后再继续执行后续任务。
  4. 异常处理:当某个线程在等待过程中因为中断或异常而退出时,CyclicBarrier和CountDownLatch的处理方式也有所不同。对于CyclicBarrier,所有等待在屏障点的线程都将收到一个BrokenBarrierException异常。而对于CountDownLatch,异常的处理取决于具体的实现和调用方式(比如是否使用了await(long timeout, TimeUnit unit)方法)。

四、总结

CyclicBarrier是Java并发包中提供的一个强大且灵活的同步工具类。它允许一组线程在一个公共的屏障点上互相等待,直到所有线程都到达这个点后再继续执行后续任务。这使得它在处理复杂的多线程同步问题时非常有用。通过深入理解CyclicBarrier的内部机制和使用场景,我们可以更好地利用它来编写高效、可靠且易于维护的并发程序。

相关文章
|
5月前
|
Java 大数据 Go
从混沌到秩序:Java共享内存模型如何通过显式约束驯服并发?
并发编程旨在混乱中建立秩序。本文对比Java共享内存模型与Golang消息传递模型,剖析显式同步与隐式因果的哲学差异,揭示happens-before等机制如何保障内存可见性与数据一致性,展现两大范式的深层分野。(238字)
168 4
|
5月前
|
缓存 安全 Java
如何理解Java中的并发?
Java并发指多任务交替执行,提升资源利用率与响应速度。通过线程实现,涉及线程安全、可见性、原子性等问题,需用synchronized、volatile、线程池及并发工具类解决,是高并发系统开发的关键基础。(238字)
326 5
|
8月前
|
SQL 缓存 安全
深度理解 Java 内存模型:从并发基石到实践应用
本文深入解析 Java 内存模型(JMM),涵盖其在并发编程中的核心作用与实践应用。内容包括 JMM 解决的可见性、原子性和有序性问题,线程与内存的交互机制,volatile、synchronized 和 happens-before 等关键机制的使用,以及在单例模式、线程通信等场景中的实战案例。同时,还介绍了常见并发 Bug 的排查与解决方案,帮助开发者写出高效、线程安全的 Java 程序。
450 0
|
8月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
428 83
|
8月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
466 83
|
9月前
|
Java 物联网 数据处理
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
Java Solon v3.2.0 是一款性能卓越的后端开发框架,新版本并发性能提升700%,内存占用节省50%。本文将从核心特性(如事件驱动模型与内存优化)、技术方案示例(Web应用搭建与数据库集成)到实际应用案例(电商平台与物联网平台)全面解析其优势与使用方法。通过简单代码示例和真实场景展示,帮助开发者快速掌握并应用于项目中,大幅提升系统性能与资源利用率。
258 6
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
|
10月前
|
缓存 安全 Java
【高薪程序员必看】万字长文拆解Java并发编程!(3-1):并发共享问题的解决与分析
活锁:多个线程相互影响对方退出同步代码块的条件而导致线程一直运行的情况。例如,线程1的退出条件是count=5,而线程2和线程3在其代码块中不断地是count进行自增自减的操作,导致线程1永远运行。内存一致性问题:由于JIT即时编译器对缓存的优化和指令重排等造成的内存可见性和有序性问题,可以通过synchronized,volatile,并发集合类等机制来解决。这里的线程安全是指,多个线程调用它们同一个实例的方法时,是线程安全的,但仅仅能保证当前调用的方法是线程安全的,不同方法之间是线程不安全的。
179 0
|
10月前
|
Java 程序员
【高薪程序员必看】万字长文拆解Java并发编程!(3-2):并发共享问题的解决与分析
wait方法和notify方法都是Object类的方法:让当前获取锁的线程进入waiting状态,并进入waitlist队列:让当前获取锁的线程进入waiting状态,并进入waitlist队列,等待n秒后自动唤醒:在waitlist队列中挑一个线程唤醒:唤醒所有在waitlist队列中的线程它们都是之间协作的手段,只有拥有对象锁的线程才能调用这些方法,否则会出现IllegalMonitorStateException异常park方法和unpark方法是LockSupport类中的方法。
186 0
|
10月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
389 0