java源码 - CyclicBarrier

简介: 开篇CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。

开篇

  • CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。

  • CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

  • CyclicBarrier的内部实现逻辑基于ReentrantLock实现,可以理解为ReentrantLock的上层应用者,通过ReentrantLock的Condtion实现线程的休眠和唤醒。


CyclicBarrier用法demo

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...


CyclicBarrier类定义

  • parties记录一共等待执行个数,count记录依然等待执行的个数。
  • barrierCommand记录所有待执行的完成后由最后一个线程执行的完成的命令。
  • Generation的代的概念来实现CyclicBarrier的复用。
  • 构造函数负责初始化parties、count、barrierCommand的核心变量。
public class CyclicBarrier {
    // 代的类定义
    private static class Generation {
        boolean broken = false;
    }

    // 内部通过ReentrantLock实现线程安全的等待
    private final ReentrantLock lock = new ReentrantLock();
    // 内部通过Lock的condition实现所有waiter的信号通知
    private final Condition trip = lock.newCondition();
    // 所有等待执行的个数
    private final int parties;
    // 所有等待线程都完成任务后由最后一个线程执行的命令
    private final Runnable barrierCommand;
   
    // 通过代的概念实现复用
    private Generation generation = new Generation();

    // 还在等待的个数
    private int count;

    // 核心构造函数
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }


CyclicBarrier工作原理

  • CyclicBarrier通过ReentrantLock来保证线程休眠和唤醒的通信。
  • 在执行过程中会对等待计数进行减一操作,值不为0当前线程进入休眠等待其他线程唤醒
  • 在执行过程中会对等待计数进行减一操作,值为0当前线程直接执行barrierCommand并且通过nextGeneration方法唤醒其他等待线程
  • 线程的休眠和唤醒都是基于ReentrantLock来实现的
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }


    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 通过lock来保证线程安全
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            // 判断generation过期的情况
            if (g.broken)
                throw new BrokenBarrierException();
            // 判断线程中断情况
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 递减待执行的个数计数
            int index = --count;
            // 所有待执行任务完成后执行barrierCommand命令
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // barrierCommand命令不为null的时候执行该命令
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    // 已经执行了barrierCommand
                    ranAction = true;
                    // 重置generation用以复用并且唤醒所有等待的线程
                    //       private void nextGeneration() {
                    //           trip.signalAll();
                    //           count = parties;
                    //           generation = new Generation();
                    //        }
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 如果count的值不为0,那么当前线程就开始进入等待
            // 外层通过lock占用锁,内层通过wait()进入休眠并释放锁
            for (;;) {
                try {
                    if (!timed)
                        // private final Condition trip = lock.newCondition();
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                
                 // 各种后置处理逻辑
                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }


参考文章

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

目录
相关文章
|
1天前
|
监控 前端开发 Java
Java基于B/S医院绩效考核管理平台系统源码 医院智慧绩效管理系统源码
医院绩效考核系统是一个关键的管理工具,旨在评估和优化医院内部各部门、科室和员工的绩效。一个有效的绩效考核系统不仅能帮助医院实现其战略目标,还能提升医疗服务质量,增强患者满意度,并促进员工的专业成长
8 0
|
1天前
|
Java 云计算
Java智能区域医院云HIS系统SaaS源码
云HIS提供标准化、信息化、可共享的医疗信息管理系统,实现医患事务管理和临床诊疗管理等标准医疗管理信息系统的功能。优化就医、管理流程,提升患者满意度、基层首诊率,通过信息共享、辅助诊疗等手段,提高基层医生的服务能力构建和谐的基层医患关系。
16 2
|
2天前
|
Java
从源码出发:JAVA中对象的比较
从源码出发:JAVA中对象的比较
9 3
|
2天前
|
前端开发 Java 关系型数据库
Java医院绩效考核系统源码B/S架构+springboot三级公立医院绩效考核系统源码 医院综合绩效核算系统源码
作为医院用综合绩效核算系统,系统需要和his系统进行对接,按照设定周期,从his系统获取医院科室和医生、护士、其他人员工作量,对没有录入信息化系统的工作量,绩效考核系统设有手工录入功能(可以批量导入),对获取的数据系统按照设定的公式进行汇算,且设置审核机制,可以退回修正,系统功能强大,完全模拟医院实际绩效核算过程,且每步核算都可以进行调整和参数设置,能适应医院多种绩效核算方式。
20 2
|
4天前
|
传感器 人工智能 前端开发
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
智慧校园电子班牌,坐落于班级的门口,适合于各类型学校的场景应用,班级学校日常内容更新可由班级自行管理,也可由学校统一管理。让我们一起看看,电子班牌有哪些功能呢?
45 4
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
|
4天前
|
分布式计算 Java API
Java8 Lambda实现源码解析
Java8的lambda应该大家都比较熟悉了,本文主要从源码层面探讨一下lambda的设计和实现。
|
5天前
|
存储 Java
0基础java初学者都能做的打字通小游戏? 内含源码解读和细致讲解!!
0基础java初学者都能做的打字通小游戏? 内含源码解读和细致讲解!!
16 2
0基础java初学者都能做的打字通小游戏? 内含源码解读和细致讲解!!
|
5天前
|
SQL Java 分布式数据库
实现HBase表和RDB表的转化(附Java源码资源)
该文介绍了如何将数据从RDB转换为HBase表,主要涉及三个来源:RDB Table、Client API和Files。文章重点讲解了RDB到HBase的转换,通过批处理思想,利用RDB接口批量导出数据并转化为`List&lt;Put&gt;`,然后导入HBase。目录结构包括配置文件、RDB接口及实现类、HBase接口及实现类,以及一个通用转换器接口和实现。代码中,`RDBImpl`负责从RDB读取数据并构造`Put`对象,`HBaseImpl`则负责将`Put`写入HBase表。整个过程通过配置文件`transfer.properties`管理HBase和RDB的映射关系。
21 3
实现HBase表和RDB表的转化(附Java源码资源)
|
8天前
|
消息中间件 缓存 Java
java基于云部署的SaaS医院云HIS系统源码 心理CT、B超 lis、电子病历
云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工作站等一系列常规功能,还能与公卫、PACS等各类外部系统融合,实现多层机构之间的融合管理。
41 12
|
11天前
|
人工智能 监控 Java
java互联网+智慧工地云平台SaaS源码
智慧工地以施工现场风险预知和联动预控为目标,将智能AI、传感技术、人像识别、监控、虚拟现实、物联网、5G、大数据、互联网等新一代科技信息技术植入到建筑、机械、人员穿戴设施、场地进出关口等各类设备中,实现工程管理与工程施工现场的整合
22 0