JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解

简介:

JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解

概述
CountDownLatch和CyclicBarrier有着相似之处,并且也常常有人将他们拿出来进行比较,这次,笔者试着从源码的角度分别解析这两个类,并且从源码的角度出发,看看两个类的不同之处。

CountDownLatch
CountDownLatch从字面上来看是一个计数工具类,实际上这个类是用来进行多线程计数的JAVA方法。

CountDownLatch内部的实现主要是依靠AQS的共享模式。当一个线程把CountDownLatch初始化了一个count之后,其他的线程调用await就会阻塞住,直到其他的线程一个一个调用countDown方法进行release操作,把count的值减到0,即把同步锁释放掉,await才会进行下去。

Sync
内部主要还是实现了一个继承自AQS的同步器Sync。Sync源码如下:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 构造方法,参数是count的数值
    Sync(int count) {
        // 内部使用state来存储count
        setState(count);
    }

    // 获取count的值
    int getCount() {
        return getState();
    }

    // 尝试获取分享模式同步器
    protected int tryAcquireShared(int acquires) {
        // 判断state的值,如果为0则获取成功,否则获取失败
        // 继承自AQS,根据AQS中的注释我们可以知道如果返回结果
        // 大于0则说明获取成功,如果小于0则说明获取失败
        // 此处不会返回0,因为没有意义
        return (getState() == 0) ? 1 : -1;
    }

    // 释放同步器
    protected boolean tryReleaseShared(int releases) {
        // 自选操作
        for (;;) {
            // 获取state
            int c = getState();
            // 如果state为0,直接返回false
            if (c == 0)
                return false;
            // 计算state-1的结果
            int nextc = c-1;
            // CAS操作将这个值同步到state上
            if (compareAndSetState(c, nextc))
                // 如果同步成功,则判断是否此时state为0
                return nextc == 0;
        }
    }
}

Sync是继承自AQS的同步器,这段代码中值得拿出来讨论的有以下几点:

为什么用state来存储count的数值?
因为state和count其实上是一个概念,当state为0的时候说明资源是空闲的,当count为0时,说明所有的CountDownLatch线程都已经完成,所以两者虽然说不是同样的意义,但是在代码实现层面的表现是完全一致的,因此可以将count记录在state中。

为什么tryAcquireShared不会返回0?
首先需要解释下tryAcquireShared在AQS中可能的返回值:负数说明是不可以获取共享锁,0说明是可以获取共享锁,但是当前线程获取后已经把所有的共享锁资源占完了,接下来的线程将不会再有多余资源可以获取了,正数则说明了你可以获取共享锁,并且之后还有余量可以给其他线程提供共享锁。然后我们回过来看CountDownLatch内部的tryAcquireShared,我们在实现上完全不关注后续线程,后续的资源占用状况,我只要当前状态,那么这个0的返回值实际上是没有必要的。

为什么tryReleaseShared中的参数不被使用到?
根据这个类的实现方式,我们可以知道tryReleaseShared的参数一定是1,因为线程的完成一定是一个一个倒数完成的。实际上我们去看countDown方法内部调用到了sync.releaseShared方法的时候可以发现他写死了参数为1,所以实际上tryReleaseShared中的参数不被使用到的原因是因为参数值固定为1.

构造函数和方法

// 构造方法
public CountDownLatch(int count) {
    // count必须大于0
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化Sync
    this.sync = new Sync(count);
}
// 等待获取锁(可被打断)
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 等待获取锁(延迟)
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 计数器降低(释放同步器)
// 每次调用减少1
public void countDown() {
    sync.releaseShared(1);
}

// 获取count
public long getCount() {
    return sync.getCount();
}

// toString
public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

CyclicBarrier
CyclicBarrier从字面上看是循环栅栏,在JAVA中的作用是让所有的线程完成后进行等待,直到所有的线程全部完成,再进行接下来的操作。

CyclicBarrier并没有直接继承AQS实现同步,而是借助了可重入锁ReentrantLock以及Condition来完成自己的内部逻辑。

成员变量

// 锁
private final ReentrantLock lock = new ReentrantLock();

// 条件
private final Condition trip = lock.newCondition();

// 线程数
private final int parties;

// 执行完所有线程后执行的Runnable方法,可以为空
private final Runnable barrierCommand;

// 分组
private Generation generation = new Generation();

// 未完成的线程数
private int count;

private static class Generation {
    boolean broken = false;
}

我们可以看到成员变量中有一个很陌生的类Generation,这个是CyclicBarrier内部声明的一个static类,作用是帮助区分线程的分组分代,使得CyclicBarrier可以被复用,如果这个简单的解释不能够让你很好地理解的话可以看接下来的源码解析,通过实现来理解它的用途。

构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;


public CyclicBarrier(int parties) {
    this(parties, null);
}

很常规的构造函数,只是简单的初始化成员变量,没有特别的地方。

核心方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await是CyclicBarrier的核心方法,就是靠着这个方法来实现线程的统一规划的,其中调用的是内部实现的doWait,我们来看下代码:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 常规的加锁操作,至于为什么要用本地变量操作,
    // 可以去看下我写的另一篇ArrayBlockingQueue的相关文章
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取Generation类
        final Generation g = generation;

        // 查看generation是否是broken,如果是broken的,
        // 那说明之前可能因为某些线程中断或者是一些意外状态导致没有办法
        // 完成所有线程到达终点(tripped)的目标而只能报错
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程被外部中断需要报错,并且在内部需要将
        // generation的broken置为true来让其他线程能够感知到中断
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 将线程未完成数减1
        int index = --count;
        // 如果此时剩余线程数为0,则说明所有的线程均已完成,即到达tripped状态
        if (index == 0) {
            boolean ranAction = false;
            try {
                // 如果有预设完成后执行的方法,则执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 此时由于这一个轮回的线程已经全部完成,
                // 所以调用nextGeneration方法开启一个新的轮回
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果此时还有其他的线程未完成,则当前线程开启自旋模式
        for (;;) {
            try {
                if (!timed)
                    // 如果timed为false,trip则阻塞住直到被唤醒
                    trip.await();
                else if (nanos > 0L)
                    // 如果timed为true,则调用awaitNanos设定时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 查看generation是否是broken,如果是broken的抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果g != generation意味着generation
            // 已经被赋予了一个新的对象,这说明要么是所有线程已经完成任务开启下一个轮回,
            // 要么是已经失败了,然后开启的下一个轮回,无论是哪一种情况,都return
            if (g != generation)
                return index;

            // 如果已经超时,则强制打断
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

看完这段核心代码之后我们回头再来反思Generation的意义,我们已经可以大致的给出使用Generation的理由了:

不同于CountDownLatch的实现,CyclicBarrier采取了更加复杂的方式,原因便是因为内部涉及到了多线程之间的干预与通信,CountDownLatch不关心线程的实现与进程,他只是一个计数器,而CyclicBarrier则需要知道线程是否正常的完结,是否被中断,如果用其他的方式代价会比较高,因此,CyclicBarrier的作者通过静态内部类的方式将整个分代的状态共享于多个线程之间,保证每个线程能够获取到栅栏的状态以及能够将自身的状态更好的反馈回去。同时,这种方式便于重置,也使得CyclicBarrier可以高效的重用。至于为什么broken没有用volatile修饰,因为类的方法内部全部都上了锁,所以不会出现数据不同步的问题。

总结
CountDownLatch和CyclicBarrier从使用上来说可能会有一些相似之处,但是在我们看完源码之后我们会发现两者可以说是天差地别,实现原理,实现方式,应用场景均不相同,总结下来有以下几点:

CountDownLatch实现直接依托于AQS;CyclicBarrier则是借助了ReentrantLock以及Condition
CountDownLatch是作为计数器存在的,因此采取了讨巧的设计,源码结构清晰并且简单,同样功能也较为简单;CyclicBarrier则为了实现多线程的掌控,采用了比较复杂的设计,在代码实现上也显得比较弯弯绕绕。
由于CyclicBarrier采用的实现方式,相比一次性的CountDownLatch,CyclicBarrier可以多次重复使用
计数方式的不同:CountDownLatch采用累加计数, CyclicBarrier则使用倒数计数
原文地址https://my.oschina.net/bjwzds/blog/3534835

相关文章
|
1月前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
368 100
|
6月前
|
前端开发 Java 关系型数据库
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
464 7
|
6月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
6月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
242 5
|
1月前
|
存储 小程序 Java
热门小程序源码合集:微信抖音小程序源码支持PHP/Java/uni-app完整项目实践指南
小程序已成为企业获客与开发者创业的重要载体。本文详解PHP、Java、uni-app三大技术栈在电商、工具、服务类小程序中的源码应用,提供从开发到部署的全流程指南,并分享选型避坑与商业化落地策略,助力开发者高效构建稳定可扩展项目。
|
5月前
|
JavaScript Java 关系型数据库
家政系统源码,java版本
这是一款基于SpringBoot后端框架、MySQL数据库及Uniapp移动端开发的家政预约上门服务系统。
179 6
家政系统源码,java版本
|
5月前
|
供应链 JavaScript 前端开发
Java基于SaaS模式多租户ERP系统源码
ERP,全称 Enterprise Resource Planning 即企业资源计划。是一种集成化的管理软件系统,它通过信息技术手段,将企业的各个业务流程和资源管理进行整合,以提高企业的运营效率和管理水平,它是一种先进的企业管理理念和信息化管理系统。 适用于小微企业的 SaaS模式多租户ERP管理系统, 采用最新的技术栈开发, 让企业简单上云。专注于小微企业的应用需求,如企业基本的进销存、询价,报价, 采购、销售、MRP生产制造、品质管理、仓库库存管理、财务应收付款, OA办公单据、CRM等。
338 23
|
4月前
|
存储 安全 Java
Java 集合面试题从数据结构到 HashMap 源码剖析详解及长尾考点梳理
本文深入解析Java集合框架,涵盖基础概念、常见集合类型及HashMap的底层数据结构与源码实现。从Collection、Map到Iterator接口,逐一剖析其特性与应用场景。重点解读HashMap在JDK1.7与1.8中的数据结构演变,包括数组+链表+红黑树优化,以及put方法和扩容机制的实现细节。结合订单管理与用户权限管理等实际案例,展示集合框架的应用价值,助你全面掌握相关知识,轻松应对面试与开发需求。
222 3
|
6月前
|
Java
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
本文深入解析了ConcurrentHashMap的实现原理,涵盖JDK 7与JDK 8的区别、静态代码块、构造方法、put/get/remove核心方法等。JDK 8通过Node数组+链表/红黑树结构优化并发性能,采用CAS和synchronized实现高效锁机制。文章还详细讲解了hash计算、表初始化、扩容协助及计数更新等关键环节,帮助读者全面掌握ConcurrentHashMap的工作机制。
143 6
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
|
6月前
|
Java 关系型数据库 MySQL
Java汽车租赁系统源码(含数据库脚本)
Java汽车租赁系统源码(含数据库脚本)
132 4