CountDownLatch源码硬核解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: CountDownLatch源码硬核解析

前言


对于并发执行,Java中的CountDownLatch是一个重要的类,简单理解, CountDownLatchcount down是倒数的意思,latch则是“门闩”的含义。在数量倒数到0的时候,打开“门闩”, 一起走,否则都等待在“门闩”的地方。

为了更好的理解CountDownLatch这个类,本文通过例子和源码带领大家深入解析这个类的原理。


介绍和使用


例子


我们先通过一个例子快速理解下CountDownLatch的妙处。

最近LOL S12赛如火如荼举行,比如我们玩王者荣耀的时候,10个万玩家登入游戏,每个玩家的网速可能不一样,只有每个人进度条走完,才会一起来到王者峡谷,网速快的要等网速慢的。我们通过例子模拟下这个过程。

@Slf4j(topic = "a.CountDownLatchTest")
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个倒时器,默认10个数量
        CountDownLatch latch = new CountDownLatch(10);
        ExecutorService service = Executors.newFixedThreadPool(10);
        // 设置进度数据
        String[] personProcess = new String[10];
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            int finalJ = i;
            service.submit(() -> {
                // 模拟10个人的进度条
                for (int j = 0; j <= 100; j++) {
                    // 模拟网速快慢,随机生成
                    try {
                        Thread.sleep(random.nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 设置进度数据
                    personProcess[finalJ] = j + "%";
                   log.info("{}", Arrays.toString(personProcess));
                }
                // 运行结束,倒时器 - 1
                latch.countDown();
            });
        }
        // 打开"阀门"
        latch.await();
       log.info("王者峡谷到了");
        service.shutdown();
    }
}

运行结果:


1671196730425.jpg


概述


CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。

构造器:

  • public CountDownLatch(int count):设置倒数器需要倒数的数量

常用API:

  • public void await() throws InterruptedException:调用await()方法的线程会被挂起,等待直到count值为0再继续执行。
  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException:同await(),若等待timeout时长后,count值还是没有变为0,不再等待,继续执行。时间单位如下常用的毫秒、天、小时、微秒、分钟、纳秒、秒。
  • public void countDown(): count值递减1
  • public long getCount():获取当前count值

常见使用场景:

一个程序中有N个任务在执行,我们可以创建值为N的CountDownLatch,当每个任务完成后,调用一下countDown()方法进行递减count值,再在主线程中使用await()方法等待任务执行完成,主线程继续执行。


实现思路


通过前面的例子和介绍我们知道CountDownLatch的大致使用流程:

  1. 创建CountDownLatch并设置计数器值。
  2. 启动多线程并且调用CountDownLatch实例的countDown()方法。
  3. 主线程调用 await() 方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务,count值为0,停止阻塞,主线程继续执行。

不妨我们先思考下,它是怎么实现的呢?我们可以问自己几个问题?

  1. 如何做到可以让主线程阻塞等待在那里?是不是可以调用LockSupport.park()方法进行阻塞。
  2. 那么什么时候该阻塞呢?我们需要有个变量,比如state, 如果state大于0,就阻塞主线程。
  3. 那么什么时候该唤醒呢,又如何唤醒呢?如果任务执行完成后,我们让state 减去1,也就是调用countDown()方法,如果发现state是0,那么就调用LockSupport.unpark()唤醒此前阻塞的地方,继续执行。

是不是很熟悉,这就是我们的AQS共享模式的实现原理啊,不了解AQS共享模式的可以参考本篇文章:深入浅出理解Java并发AQS的共享锁模式

我们把思路理清楚后,直接看CountDownLatch的源码。


源码解析


类结构图


1671196747272.jpg


以上是CountDownLatch的类结构图,

  • SyncCountDownLatch的内部类,被成员变量sync持有。
  • Sync继承了AbstractQueuedSynchronizer,也就是我们大名鼎鼎的AQS。


await() 实现原理


  1. 线程调用 await()会阻塞等待其他线程完成任务
// CountDownLatch#await
public void await() throws InterruptedException {
    // 调用AbstractQueuedSynchronizer的acquireSharedInterruptibly方法
    sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 判断线程是否被打断,抛出打断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取共享锁
    // 条件成立说明 state > 0,此时线程入队阻塞等待,等待其他线程获取共享资源
    // 条件不成立说明 state = 0,此时不需要阻塞线程,直接结束函数调用
    if (tryAcquireShared(arg) < 0)
        // 阻塞当前线程的逻辑
        doAcquireSharedInterruptibly(arg);
}
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
  1. doAcquireSharedInterruptibly()方法是实现线程阻塞的核心逻辑
// AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 将调用latch.await()方法的线程 包装成 SHARED 类型的 node 加入到 AQS 的阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 前驱节点时头节点就可以尝试获取锁
            if (p == head) {
                // 再次尝试获取锁,获取成功返回 1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 获取锁成功,设置当前节点为 head 节点,并且向后传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 阻塞在这里
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 阻塞线程被中断后抛出异常,进入取消节点的逻辑
        if (failed)
            cancelAcquire(node);
    }
}
  1. parkAndCheckInterrupt()方法中会进行阻塞操作
private final boolean parkAndCheckInterrupt() {
      // 阻塞线程
        LockSupport.park(this);
        return Thread.interrupted();
    }


countDown()实现原理


  1. 任务结束调用 countDown() 完成计数器减一(释放锁)的操作
public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    // 尝试释放共享锁
    if (tryReleaseShared(arg)) {
        // 释放锁成功开始唤醒阻塞节点
        doReleaseShared();
        return true;
    }
    return false;
}
  1. 调用tryReleaseShared()方法尝试释放锁,true表示state等于0,去唤醒阻塞线程。
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        // 条件成立说明前面【已经有线程触发唤醒操作】了,这里返回 false
        if (c == 0)
            return false;
        // 计数器减一
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            // 计数器为 0 时返回 true
            return nextc == 0;
    }
}
  1. 调用doReleaseShared()唤醒阻塞的节点
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 判断队列是否是空队列
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 头节点的状态为 signal,说明后继节点没有被唤醒过
            if (ws == Node.SIGNAL) {
                // cas 设置头节点的状态为 0,设置失败继续自旋
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            // 如果有其他线程已经设置了头节点的状态,重新设置为 PROPAGATE 传播属性
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的head,
        // 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点
        if (h == head)
            break;
    }
}


目录
相关文章
|
9天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
35 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
70 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
62 0
|
10天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
22天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
40 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
113 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
算法 Java 程序员
Map - TreeSet & TreeMap 源码解析
Map - TreeSet & TreeMap 源码解析
34 0

推荐镜像

更多
下一篇
无影云桌面