Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

简介: Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

195d03d17afc4a928bc581f313b01dfe.png


Pre


Java Review - 并发编程_ CountDownLatch原理&源码剖析介绍的CountDownLatch在解决多个线程同步方面相对于调用线程的join方法已经有了不少优化,但是CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和countdown方法都会立刻返回,这就起不到线程同步的效果了。


所以为了满足计数器可以重置的需要,JDK开发组提供了CyclicBarrier类,并且CyclicBarrier类的功能并不限于CountDownLatch的功能。从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。


这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。


之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。


小Demo


需求如下: 使用两个线程去执行一个被分解的任务A,当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/18 15:01
 * @mark: show me the code , change the world
 */
public class CycleBarrierTest {
    // 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " merge result"));
    public static void main(String[] args) {
        // 创建一个线程数量固定为2的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 将线程A 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " begin to handle  task1-1");
                System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 将线程B 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " begin to handle  task1-2");
                System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 关闭线程池
        executorService.shutdown();
    }
}

af9718ff528c4fa68c3299111b126241.png


【每次运行的结果可能不尽相同,但核心的流程是一致的】


首先创建了一个CyclicBarrier对象,其第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。


在main函数里面首先创建了一个大小为2的线程池,然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用await方法。一开始计数器值为2,当第一个线程调用await方法时,计数器值会递减为1。


由于此时计数器值不为0,所以当前线程就到了屏障点而被阻塞。然后第二个线程调用await时,会进入屏障,计数器值也会递减.


现在计数器值为0,这时就会去执行CyclicBarrier构造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第二个线程,这时候第一个线程也会退出屏障点继续向下运行。


由此可见多个线程之间是相互等待的,假如计数器值为N,那么随后调用await方法的N-1个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为0了,这时候第N个线程才会发出通知唤醒前面的N-1个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行。


对于这个例子来说,使用CountDownLatch也可以得到类似的输出结果。下面再举个例子来说明CyclicBarrier的可复用性。


需求: 假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/18 15:23
 * @mark: show me the code , change the world
 */
public class CycleBarrierTest2 {
    // 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " 阶段任务全部线程执行结束....开启下一轮"));
    public static void main(String[] args) {
        // 创建一个线程数量固定为2的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 将线程A 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " execute step1");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step2");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 将线程B 提交到线程池
        executorService.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " execute step1");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step2");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " execute step3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        // 关闭线程池
        executorService.shutdown();
    }
}

533ba140ed774becafe750247b318abb.png


在如上代码中,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段1后才会开始执行阶段2。然后在阶段2后面调用了await方法,这保证了所有线程都完成了阶段2后,才能开始阶段3的执行。这个功能使用单个CountDownLatch是无法完成的。


类图结构


b705be66671f4c4fb326a571bff24cb9.png


由以上类图可知:


CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。


parties用来记录线程个数,这里表示多少线程调用await后,所有线程才会冲破屏障继续往下运行


count一开始等于parties,每当有线程调用await方法就递减1,当count为0时就表示所有线程都到了屏障点


为何维护parties和count两个变量,只使用count不就可以了?


别忘了CycleBarier是可以被复用的,使用两个变量的原因是,parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,从而进行复用。这两个变量是在构造CyclicBarrier对象时传递的

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

还有一个变量barrierCommand也通过构造函数传递,这是一个任务,这个任务的执行时机是当所有线程都到达屏障点后。使用lock首先保证了更新计数器count的原子性。另外使用lock的条件变量trip支持线程间使用await和signal操作进行同步。


最后,在变量generation内部有一个变量broken,其用来记录当前屏障是否被打破。注意,这里的broken并没有被声明为volatile的,因为是在锁内使用变量,所以不需要声明。

 /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which {@code count} applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
     */
    private static class Generation {
        boolean broken = false;
    }


CyclicBarrier核心方法源码解读


int await()


当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回


parties个线程都调用了await()方法,也就是线程都到了屏障点


其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常而返回


与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }


通过源码可以知道,在内部调用了dowait方法。第一个参数为false则说明不设置超时时间,这时候第二个参数没有意义


int await(long timeout, TimeUnit unit)


当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:


parties个线程都调用了await()方法,也就是线程都到了屏障点,这时候返回true;


设置的超时时间到了后返回false;


其他线程调用当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常然后返回;


与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回。

   public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

在内部调用了dowait方法。第一个参数为true则说明设置了超时时间,这时候第二个参数是超时时间。


int dowait(boolean timed, long nanos)

CyclicBarrier的核心功能

 /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            //  1 如果index=0说明到所有线程都到达了屏障点,此时执行初始化时执行的任务
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                      // 2 执行任务
                        command.run();
                    ranAction = true;
                    // 3 激活其他线程因为await方法而被阻塞的线程,并重置CyclicBarrier
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
      // 4 如果index !=0 
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try { 
                  // 5 没有设置超时时间
                    if (!timed)
                        trip.await();
                   // 6 设置超时时间     
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
      // 7 从条件队列里唤醒里面的阻塞贤臣
        // signal completion of last generation
        trip.signalAll();
        // 8 重置CyclicBarrier
        // set up next generation
        count = parties;
        generation = new Generation();
    }

以上是dowait方法的核心代码


当一个线程调用了dowait方法后,首先会获取独占锁lock,如果创建CycleBarrier时传递的参数为10,那么后面9个调用线程会被阻塞。


然后当前获取到锁的线程会对计数器count进行递减操作,递减后count=index=9,因为index!=0所以当前线程会执行代码(4)


如果当前线程调用的是无参数的await()方法,则这里timed=false,所以当前线程会被放入条件变量trip的条件阻塞队列,当前线程会被挂起并释放获取的lock锁。


如果调用的是有参数的await方法则timed=true,然后当前线程也会被放入条件变量的条件队列并释放锁资源,不同的是当前线程会在指定时间超时后自动被激活。


当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的9个线程中有一个会竞争到lock锁,然后执行与第一个线程同样的操作,直到最后一个线程获取到lock锁,此时已经有9个线程被放入了条件变量trip的条件队列里面。


最后count=index等于0,所以执行代码(2),如果创建CyclicBarrier时传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他9个线程,并重置CyclicBarrier,然后这10个线程就可以继续向下运行了。


小结


我们这里通过Demo说明了CycleBarrier与CountDownLatch的不同在于,CycleBarrier是可以复用的,并且CycleBarrier特别适合分段任务有序执行的场景。


然后分析了CycleBarrier通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。

目录
打赏
0
0
0
0
99
分享
相关文章
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
51 5
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
24 7
Java汽车租赁系统源码(含数据库脚本)
Java汽车租赁系统源码(含数据库脚本)
28 4
|
11天前
|
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
本文深入解析了ConcurrentHashMap的实现原理,涵盖JDK 7与JDK 8的区别、静态代码块、构造方法、put/get/remove核心方法等。JDK 8通过Node数组+链表/红黑树结构优化并发性能,采用CAS和synchronized实现高效锁机制。文章还详细讲解了hash计算、表初始化、扩容协助及计数更新等关键环节,帮助读者全面掌握ConcurrentHashMap的工作机制。
53 6
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
Java Review(三十三、异常处理----补充:断言、日志、调试)
Java Review(三十三、异常处理----补充:断言、日志、调试)
195 0
|
3月前
|
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
214 60
【Java并发】【线程池】带你从0-1入门线程池
|
22天前
|
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
本文涉及InheritableThreadLocal和TTL,从源码的角度,分别分析它们是怎么实现父子线程传递的。建议先了解ThreadLocal。
57 4
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
102 23

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等