面试官: CyclicBarrier有了解过吗?说说看(源码剖析)

简介: 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~


Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解CyclicBarrier,一起来看下吧~


CyclicBarrier

这个跟我们上节讲的CountDownLatch有点类似,从字面意思讲是相当于一个可循环的屏障,他与CountDownLatch不同的是它可以重复利用,下一步的操作以,依赖上一步是否完成,就像去银行办业务一样,排在你前面的人办好了才轮到你,我们继续通过上节的例子,来改写一下它,这里我偷个懒,实际业务中尽量用类编写,不要直接new Thread

public class CyclicBarrierTest {
    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(1);
        IntStream.range(0, 10).forEach(i -> {
            new Thread(() -> {
                try {
                    Thread.sleep(2000);
                    System.out.println("worker 1------> " + i);
                    cyclicBarrier.await();
                    Thread.sleep(2000);
                    System.out.println("worker 2------> " + i);
                    cyclicBarrier.await();
                    Thread.sleep(2000);
                    System.out.println("worker 3------> " + i);
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        });
        System.out.println("completed !");
    }
}
复制代码


实际输出:

completed !
worker 1------> 9
worker 1------> 0
worker 1------> 6
worker 1------> 7
worker 1------> 5
worker 1------> 4
worker 1------> 1
worker 1------> 3
worker 1------> 2
worker 1------> 8
worker 2------> 7
worker 2------> 6
worker 2------> 5
worker 2------> 2
worker 2------> 3
worker 2------> 1
worker 2------> 8
worker 2------> 0
worker 2------> 9
worker 2------> 4
worker 3------> 6
worker 3------> 3
worker 3------> 2
worker 3------> 5
worker 3------> 7
worker 3------> 8
worker 3------> 1
worker 3------> 0
worker 3------> 9
worker 3------> 4
复制代码

可以看到在即使在多线程下,每个操作都需要上一个await任务之后执行,使用很简单,也很好理解。


知其然知其所以然 & 源码剖析

下面我们就一起探究一下,它是如何做到的?


同样的,我们先看构造函数

public CyclicBarrier(int parties) {
    this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
复制代码


默认barrierAction是null, 这个参数是Runnable参数,当最后线程达到的时候执行的任务,刚刚的例子中没有演示,大家可以在初始化的时候传入一个,打印一下当前的线程名称,这样理解起来比较容易点,parties int型,它的意思是参与的线程数。


我们再看它的定义, 可以看到它没有继承任何类或实现任何接口

public class CyclicBarrier { .... }
复制代码


await

我们重点看下这个方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
复制代码


这个方法干嘛用的呢❓等到所有各方都在此屏障上调用了await 。如果当前线程不是最后到达的,则出于线程调度目的将其禁用并处于休眠状态,除了以下情况:

  • 最后一个线程到达;或者
  • 其他一些线程中断当前线程;或者
  • 其他一些线程中断了其他等待线程之一;或者
  • 其他一些线程在等待屏障时超时;或者
  • 其他一些线程在此屏障上调用reset 。


再看dowait(), 它是一个私有方法

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 全局锁        
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 每次使用屏障都会生成一个实例
            // private Generation generation = new Generation();
            final Generation g = generation;
            // broken字面意思破坏,如果被破坏了就抛异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 线程中断检测
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 剩余的等待线程数
            int index = --count;
            // 最后线程到达时 
            if (index == 0) {  // tripped
                // 标记任务是否被执行(就是传进入的runable参数)
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 执行任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 完成后 进行下一组 初始化 generation 初始化 count 并唤醒所有等待的线程 
                    // 
                    // private void nextGeneration() {
                    //     // signal completion of last generation
                    //     trip.signalAll();
                    //     // set up next generation
                    //     count = parties;
                    //     generation = new Generation();
                    // }
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // index 不为0时 进入自旋
            for (;;) {
                try {
                    // 先判断超时 没超时就继续等着
                    if (!timed)
                        trip.await();
                        // 如果超出指定时间 调用 awaitNanos 超时了释放锁
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                        // 中断异常捕获
                } catch (InterruptedException ie) {
                    // 判断是否被破坏
                    if (g == generation && ! g.broken) {
                        //  private void breakBarrier() {
                        //     generation.broken = true;
                        //     count = parties;
                        //     trip.signalAll();
                        // }
                        breakBarrier();
                        throw ie;
                    } else {
                        // 否则的话中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }
                // 被破坏抛异常
                if (g.broken)
                    throw new BrokenBarrierException();
                // 正常调用 就返回 
                if (g != generation)
                    return index;
                // 超时了而被唤醒的情况 调用 breakBarrier()
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
复制代码


如果被破坏了怎么恢复呢?来看下reset, 源码很简单,break之后重新生成新的实例,对应的会重新初始化count,在dowaitindex==0也调用了nextGeneration,所以说它是可以循环利用的

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
复制代码


结束语

cyclicBarrier源码相对简单一些,下节给大家讲下Phaser,它是增强版的CountDownLatch,它的实现相对更加复杂一点 ~

相关文章
|
4月前
|
Java 开发者
Java面试题:请解释内存泄漏的原因,并说明如何使用Thread类和ExecutorService实现多线程编程,请解释CountDownLatch和CyclicBarrier在并发编程中的用途和区别
Java面试题:请解释内存泄漏的原因,并说明如何使用Thread类和ExecutorService实现多线程编程,请解释CountDownLatch和CyclicBarrier在并发编程中的用途和区别
56 0
|
6天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
20 2
|
3月前
|
JavaScript 前端开发
【Vue面试题二十五】、你了解axios的原理吗?有看过它的源码吗?
这篇文章主要讨论了axios的使用、原理以及源码分析。 文章中首先回顾了axios的基本用法,包括发送请求、请求拦截器和响应拦截器的使用,以及如何取消请求。接着,作者实现了一个简易版的axios,包括构造函数、请求方法、拦截器的实现等。最后,文章对axios的源码进行了分析,包括目录结构、核心文件axios.js的内容,以及axios实例化过程中的配置合并、拦截器的使用等。
【Vue面试题二十五】、你了解axios的原理吗?有看过它的源码吗?
|
3月前
|
JavaScript 前端开发
【Vue面试题二十七】、你了解axios的原理吗?有看过它的源码吗?
文章讨论了Vue项目目录结构的设计原则和实践,强调了项目结构清晰的重要性,提出了包括语义一致性、单一入口/出口、就近原则、公共文件的绝对路径引用等原则,并展示了单页面和多页面Vue项目的目录结构示例。
|
2月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
412 37
面试官: 请你手写一份 Call()源码,看完此篇不用担心!
面试官: 请你手写一份 Call()源码,看完此篇不用担心!
|
4月前
|
Java 测试技术 开发者
Java面试题:解释CountDownLatch, CyclicBarrier和Semaphore在并发编程中的使用
Java面试题:解释CountDownLatch, CyclicBarrier和Semaphore在并发编程中的使用
72 11
|
3月前
|
存储 JavaScript 前端开发
JS浅拷贝及面试时手写源码
JS浅拷贝及面试时手写源码
|
4月前
|
存储 安全 Java
Android面试题之ArrayList源码详解
ArrayList是Java中基于数组实现的列表,提供O(1)的索引访问,但插入和删除操作平均时间复杂度为O(n)。默认容量为10,当需要时会通过System.arraycopy扩容。允许存储null,非线程安全。面试常问:List是接口,ArrayList是其实现之一,推荐使用List接口编程以实现更好的灵活性。更多详情见[ArrayList源码](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8u40-b25/java/util/ArrayList.java#ArrayList.Node)。
33 2
|
4月前
|
设计模式 安全 Java
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
63 0