Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

简介: Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

195d03d17afc4a928bc581f313b01dfe.png


Pre


Java Review - 并发编程_ CountDownLatch原理&源码剖析介绍的CountDownLatch在解决多个线程同步方面相对于调用线程的join方法已经有了不少优化,但是CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和countdown方法都会立刻返回,这就起不到线程同步的效果了。


所以为了满足计数器可以重置的需要,JDK开发组提供了CyclicBarrier类,并且CyclicBarrier类的功能并不限于CountDownLatch的功能。从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。


这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。


之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。


小Demo


需求如下: 使用两个线程去执行一个被分解的任务A,当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/18 15:01
 * @mark: show me the code , change the world
 */
public class CycleBarrierTest {
    // 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " merge result"));
    public static void main(String[] args) {
        // 创建一个线程数量固定为2的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 将线程A 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " begin to handle  task1-1");
                System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 将线程B 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " begin to handle  task1-2");
                System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 关闭线程池
        executorService.shutdown();
    }
}

af9718ff528c4fa68c3299111b126241.png


【每次运行的结果可能不尽相同,但核心的流程是一致的】


首先创建了一个CyclicBarrier对象,其第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。


在main函数里面首先创建了一个大小为2的线程池,然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用await方法。一开始计数器值为2,当第一个线程调用await方法时,计数器值会递减为1。


由于此时计数器值不为0,所以当前线程就到了屏障点而被阻塞。然后第二个线程调用await时,会进入屏障,计数器值也会递减.


现在计数器值为0,这时就会去执行CyclicBarrier构造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第二个线程,这时候第一个线程也会退出屏障点继续向下运行。


由此可见多个线程之间是相互等待的,假如计数器值为N,那么随后调用await方法的N-1个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为0了,这时候第N个线程才会发出通知唤醒前面的N-1个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行。


对于这个例子来说,使用CountDownLatch也可以得到类似的输出结果。下面再举个例子来说明CyclicBarrier的可复用性。


需求: 假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/18 15:23
 * @mark: show me the code , change the world
 */
public class CycleBarrierTest2 {
    // 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " 阶段任务全部线程执行结束....开启下一轮"));
    public static void main(String[] args) {
        // 创建一个线程数量固定为2的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 将线程A 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " execute step1");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step2");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 将线程B 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " execute step1");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step2");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 关闭线程池
        executorService.shutdown();
    }
}

533ba140ed774becafe750247b318abb.png


在如上代码中,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段1后才会开始执行阶段2。然后在阶段2后面调用了await方法,这保证了所有线程都完成了阶段2后,才能开始阶段3的执行。这个功能使用单个CountDownLatch是无法完成的。


类图结构


b705be66671f4c4fb326a571bff24cb9.png


由以上类图可知:


CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。


parties用来记录线程个数,这里表示多少线程调用await后,所有线程才会冲破屏障继续往下运行


count一开始等于parties,每当有线程调用await方法就递减1,当count为0时就表示所有线程都到了屏障点


为何维护parties和count两个变量,只使用count不就可以了?


别忘了CycleBarier是可以被复用的,使用两个变量的原因是,parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,从而进行复用。这两个变量是在构造CyclicBarrier对象时传递的

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

还有一个变量barrierCommand也通过构造函数传递,这是一个任务,这个任务的执行时机是当所有线程都到达屏障点后。使用lock首先保证了更新计数器count的原子性。另外使用lock的条件变量trip支持线程间使用await和signal操作进行同步。


最后,在变量generation内部有一个变量broken,其用来记录当前屏障是否被打破。注意,这里的broken并没有被声明为volatile的,因为是在锁内使用变量,所以不需要声明。

 /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which {@code count} applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
     */
    private static class Generation {
        boolean broken = false;
    }


CyclicBarrier核心方法源码解读


int await()


当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回


parties个线程都调用了await()方法,也就是线程都到了屏障点


其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常而返回


与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }


通过源码可以知道,在内部调用了dowait方法。第一个参数为false则说明不设置超时时间,这时候第二个参数没有意义


int await(long timeout, TimeUnit unit)


当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:


parties个线程都调用了await()方法,也就是线程都到了屏障点,这时候返回true;


设置的超时时间到了后返回false;


其他线程调用当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常然后返回;


与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回。

   public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

在内部调用了dowait方法。第一个参数为true则说明设置了超时时间,这时候第二个参数是超时时间。


int dowait(boolean timed, long nanos)

CyclicBarrier的核心功能

 /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            //  1 如果index=0说明到所有线程都到达了屏障点,此时执行初始化时执行的任务
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                      // 2 执行任务
                        command.run();
                    ranAction = true;
                    // 3 激活其他线程因为await方法而被阻塞的线程,并重置CyclicBarrier
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
      // 4 如果index !=0 
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try { 
                  // 5 没有设置超时时间
                    if (!timed)
                        trip.await();
                   // 6 设置超时时间     
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
      // 7 从条件队列里唤醒里面的阻塞贤臣
        // signal completion of last generation
        trip.signalAll();
        // 8 重置CyclicBarrier
        // set up next generation
        count = parties;
        generation = new Generation();
    }

以上是dowait方法的核心代码


当一个线程调用了dowait方法后,首先会获取独占锁lock,如果创建CycleBarrier时传递的参数为10,那么后面9个调用线程会被阻塞。


然后当前获取到锁的线程会对计数器count进行递减操作,递减后count=index=9,因为index!=0所以当前线程会执行代码(4)


如果当前线程调用的是无参数的await()方法,则这里timed=false,所以当前线程会被放入条件变量trip的条件阻塞队列,当前线程会被挂起并释放获取的lock锁。


如果调用的是有参数的await方法则timed=true,然后当前线程也会被放入条件变量的条件队列并释放锁资源,不同的是当前线程会在指定时间超时后自动被激活。


当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的9个线程中有一个会竞争到lock锁,然后执行与第一个线程同样的操作,直到最后一个线程获取到lock锁,此时已经有9个线程被放入了条件变量trip的条件队列里面。


最后count=index等于0,所以执行代码(2),如果创建CyclicBarrier时传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他9个线程,并重置CyclicBarrier,然后这10个线程就可以继续向下运行了。


小结


我们这里通过Demo说明了CycleBarrier与CountDownLatch的不同在于,CycleBarrier是可以复用的,并且CycleBarrier特别适合分段任务有序执行的场景。


然后分析了CycleBarrier通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。

相关文章
|
6月前
|
存储 缓存 Java
我们来详细讲一讲 Java NIO 底层原理
我是小假 期待与你的下一次相遇 ~
214 2
|
5月前
|
监控 Java API
现代 Java IO 高性能实践从原理到落地的高效实现路径与实战指南
本文深入解析现代Java高性能IO实践,涵盖异步非阻塞IO、操作系统优化、大文件处理、响应式网络编程与数据库访问,结合Netty、Reactor等技术落地高并发应用,助力构建高效可扩展的IO系统。
153 0
|
6月前
|
存储 算法 安全
Java中的对称加密算法的原理与实现
本文详细解析了Java中三种常用对称加密算法(AES、DES、3DES)的实现原理及应用。对称加密使用相同密钥进行加解密,适合数据安全传输与存储。AES作为现代标准,支持128/192/256位密钥,安全性高;DES采用56位密钥,现已不够安全;3DES通过三重加密增强安全性,但性能较低。文章提供了各算法的具体Java代码示例,便于快速上手实现加密解密操作,帮助用户根据需求选择合适的加密方案保护数据安全。
419 58
|
5月前
|
人工智能 安全 Java
Go与Java泛型原理简介
本文介绍了Go与Java泛型的实现原理。Go通过单态化为不同类型生成函数副本,提升运行效率;而Java则采用类型擦除,将泛型转为Object类型处理,保持兼容性但牺牲部分类型安全。两种机制各有优劣,适用于不同场景。
180 24
|
6月前
|
XML JSON Java
Java 反射:从原理到实战的全面解析与应用指南
本文深度解析Java反射机制,从原理到实战应用全覆盖。首先讲解反射的概念与核心原理,包括类加载过程和`Class`对象的作用;接着详细分析反射的核心API用法,如`Class`、`Constructor`、`Method`和`Field`的操作方法;最后通过动态代理和注解驱动配置解析等实战场景,帮助读者掌握反射技术的实际应用。内容翔实,适合希望深入理解Java反射机制的开发者。
563 13
|
5月前
|
存储 缓存 安全
深入讲解 Java 并发编程核心原理与应用案例
本教程全面讲解Java并发编程,涵盖并发基础、线程安全、同步机制、并发工具类、线程池及实际应用案例,助你掌握多线程开发核心技术,提升程序性能与响应能力。
231 0
|
6月前
|
算法 Java 索引
说一说 Java 并发队列原理剖析
我是小假 期待与你的下一次相遇 ~
|
6月前
|
安全 Java 编译器
JD-GUI,java反编译工具及原理: JavaDecompiler一个Java反编译器
Java Decompiler (JD-GUI) 是一款由 Pavel Kouznetsov 开发的图形化 Java 反编译工具,支持 Windows、Linux 和 Mac Os。它能将 `.class` 文件反编译为 Java 源代码,支持多文件标签浏览、高亮显示,并兼容 Java 5 及以上版本。JD-GUI 支持对整个 Jar 文件进行反编译,可跳转源码,适用于多种 JDK 和编译器。其原理基于将字节码转换为抽象语法树 (AST),再通过反编译生成代码。尽管程序可能带来安全风险,但可通过代码混淆降低可读性。最新版修复了多项识别错误并优化了内存管理。
3128 1