JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解

简介:

JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解

概述
CountDownLatch和CyclicBarrier有着相似之处,并且也常常有人将他们拿出来进行比较,这次,笔者试着从源码的角度分别解析这两个类,并且从源码的角度出发,看看两个类的不同之处。

CountDownLatch
CountDownLatch从字面上来看是一个计数工具类,实际上这个类是用来进行多线程计数的JAVA方法。

CountDownLatch内部的实现主要是依靠AQS的共享模式。当一个线程把CountDownLatch初始化了一个count之后,其他的线程调用await就会阻塞住,直到其他的线程一个一个调用countDown方法进行release操作,把count的值减到0,即把同步锁释放掉,await才会进行下去。

Sync
内部主要还是实现了一个继承自AQS的同步器Sync。Sync源码如下:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 构造方法,参数是count的数值
    Sync(int count) {
        // 内部使用state来存储count
        setState(count);
    }

    // 获取count的值
    int getCount() {
        return getState();
    }

    // 尝试获取分享模式同步器
    protected int tryAcquireShared(int acquires) {
        // 判断state的值,如果为0则获取成功,否则获取失败
        // 继承自AQS,根据AQS中的注释我们可以知道如果返回结果
        // 大于0则说明获取成功,如果小于0则说明获取失败
        // 此处不会返回0,因为没有意义
        return (getState() == 0) ? 1 : -1;
    }

    // 释放同步器
    protected boolean tryReleaseShared(int releases) {
        // 自选操作
        for (;;) {
            // 获取state
            int c = getState();
            // 如果state为0,直接返回false
            if (c == 0)
                return false;
            // 计算state-1的结果
            int nextc = c-1;
            // CAS操作将这个值同步到state上
            if (compareAndSetState(c, nextc))
                // 如果同步成功,则判断是否此时state为0
                return nextc == 0;
        }
    }
}

Sync是继承自AQS的同步器,这段代码中值得拿出来讨论的有以下几点:

为什么用state来存储count的数值?
因为state和count其实上是一个概念,当state为0的时候说明资源是空闲的,当count为0时,说明所有的CountDownLatch线程都已经完成,所以两者虽然说不是同样的意义,但是在代码实现层面的表现是完全一致的,因此可以将count记录在state中。

为什么tryAcquireShared不会返回0?
首先需要解释下tryAcquireShared在AQS中可能的返回值:负数说明是不可以获取共享锁,0说明是可以获取共享锁,但是当前线程获取后已经把所有的共享锁资源占完了,接下来的线程将不会再有多余资源可以获取了,正数则说明了你可以获取共享锁,并且之后还有余量可以给其他线程提供共享锁。然后我们回过来看CountDownLatch内部的tryAcquireShared,我们在实现上完全不关注后续线程,后续的资源占用状况,我只要当前状态,那么这个0的返回值实际上是没有必要的。

为什么tryReleaseShared中的参数不被使用到?
根据这个类的实现方式,我们可以知道tryReleaseShared的参数一定是1,因为线程的完成一定是一个一个倒数完成的。实际上我们去看countDown方法内部调用到了sync.releaseShared方法的时候可以发现他写死了参数为1,所以实际上tryReleaseShared中的参数不被使用到的原因是因为参数值固定为1.

构造函数和方法

// 构造方法
public CountDownLatch(int count) {
    // count必须大于0
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化Sync
    this.sync = new Sync(count);
}
// 等待获取锁(可被打断)
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 等待获取锁(延迟)
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 计数器降低(释放同步器)
// 每次调用减少1
public void countDown() {
    sync.releaseShared(1);
}

// 获取count
public long getCount() {
    return sync.getCount();
}

// toString
public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

CyclicBarrier
CyclicBarrier从字面上看是循环栅栏,在JAVA中的作用是让所有的线程完成后进行等待,直到所有的线程全部完成,再进行接下来的操作。

CyclicBarrier并没有直接继承AQS实现同步,而是借助了可重入锁ReentrantLock以及Condition来完成自己的内部逻辑。

成员变量

// 锁
private final ReentrantLock lock = new ReentrantLock();

// 条件
private final Condition trip = lock.newCondition();

// 线程数
private final int parties;

// 执行完所有线程后执行的Runnable方法,可以为空
private final Runnable barrierCommand;

// 分组
private Generation generation = new Generation();

// 未完成的线程数
private int count;

private static class Generation {
    boolean broken = false;
}

我们可以看到成员变量中有一个很陌生的类Generation,这个是CyclicBarrier内部声明的一个static类,作用是帮助区分线程的分组分代,使得CyclicBarrier可以被复用,如果这个简单的解释不能够让你很好地理解的话可以看接下来的源码解析,通过实现来理解它的用途。

构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;


public CyclicBarrier(int parties) {
    this(parties, null);
}

很常规的构造函数,只是简单的初始化成员变量,没有特别的地方。

核心方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await是CyclicBarrier的核心方法,就是靠着这个方法来实现线程的统一规划的,其中调用的是内部实现的doWait,我们来看下代码:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 常规的加锁操作,至于为什么要用本地变量操作,
    // 可以去看下我写的另一篇ArrayBlockingQueue的相关文章
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取Generation类
        final Generation g = generation;

        // 查看generation是否是broken,如果是broken的,
        // 那说明之前可能因为某些线程中断或者是一些意外状态导致没有办法
        // 完成所有线程到达终点(tripped)的目标而只能报错
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程被外部中断需要报错,并且在内部需要将
        // generation的broken置为true来让其他线程能够感知到中断
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 将线程未完成数减1
        int index = --count;
        // 如果此时剩余线程数为0,则说明所有的线程均已完成,即到达tripped状态
        if (index == 0) {
            boolean ranAction = false;
            try {
                // 如果有预设完成后执行的方法,则执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 此时由于这一个轮回的线程已经全部完成,
                // 所以调用nextGeneration方法开启一个新的轮回
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果此时还有其他的线程未完成,则当前线程开启自旋模式
        for (;;) {
            try {
                if (!timed)
                    // 如果timed为false,trip则阻塞住直到被唤醒
                    trip.await();
                else if (nanos > 0L)
                    // 如果timed为true,则调用awaitNanos设定时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 查看generation是否是broken,如果是broken的抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果g != generation意味着generation
            // 已经被赋予了一个新的对象,这说明要么是所有线程已经完成任务开启下一个轮回,
            // 要么是已经失败了,然后开启的下一个轮回,无论是哪一种情况,都return
            if (g != generation)
                return index;

            // 如果已经超时,则强制打断
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

看完这段核心代码之后我们回头再来反思Generation的意义,我们已经可以大致的给出使用Generation的理由了:

不同于CountDownLatch的实现,CyclicBarrier采取了更加复杂的方式,原因便是因为内部涉及到了多线程之间的干预与通信,CountDownLatch不关心线程的实现与进程,他只是一个计数器,而CyclicBarrier则需要知道线程是否正常的完结,是否被中断,如果用其他的方式代价会比较高,因此,CyclicBarrier的作者通过静态内部类的方式将整个分代的状态共享于多个线程之间,保证每个线程能够获取到栅栏的状态以及能够将自身的状态更好的反馈回去。同时,这种方式便于重置,也使得CyclicBarrier可以高效的重用。至于为什么broken没有用volatile修饰,因为类的方法内部全部都上了锁,所以不会出现数据不同步的问题。

总结
CountDownLatch和CyclicBarrier从使用上来说可能会有一些相似之处,但是在我们看完源码之后我们会发现两者可以说是天差地别,实现原理,实现方式,应用场景均不相同,总结下来有以下几点:

CountDownLatch实现直接依托于AQS;CyclicBarrier则是借助了ReentrantLock以及Condition
CountDownLatch是作为计数器存在的,因此采取了讨巧的设计,源码结构清晰并且简单,同样功能也较为简单;CyclicBarrier则为了实现多线程的掌控,采用了比较复杂的设计,在代码实现上也显得比较弯弯绕绕。
由于CyclicBarrier采用的实现方式,相比一次性的CountDownLatch,CyclicBarrier可以多次重复使用
计数方式的不同:CountDownLatch采用累加计数, CyclicBarrier则使用倒数计数
原文地址https://my.oschina.net/bjwzds/blog/3534835

相关文章
|
12天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
45 7
|
23天前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
28 4
|
29天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
67 2
|
4天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
40 13
|
17天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
47 12
|
12天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
1月前
|
Java
Java之CountDownLatch原理浅析
本文介绍了Java并发工具类`CountDownLatch`的使用方法、原理及其与`Thread.join()`的区别。`CountDownLatch`通过构造函数接收一个整数参数作为计数器,调用`countDown`方法减少计数,`await`方法会阻塞当前线程,直到计数为零。文章还详细解析了其内部机制,包括初始化、`countDown`和`await`方法的工作原理,并给出了一个游戏加载场景的示例代码。
Java之CountDownLatch原理浅析
|
1月前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
40 3
|
14天前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。
|
1月前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
84 3