CountDownLatch实现原理全面解析

简介: CountDownLatch是一个同步工具类,用来协调多个线程之间的同步(即:用于线程之间的通信而不是互斥)。它允许一个或多个线程进入等待状态,直到其他线程执行完毕后,这些等待的线程才继续执行。

简介

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步(即:用于线程之间的通信而不是互斥)。它允许一个或多个线程进入等待状态,直到其他线程执行完毕后,这些等待的线程才继续执行。

CountDownLatch通过一个计数器来实现,其中维护了一个count变量和操作该变量的两个主要方法:

  • await()方法:线程调用await()方法,会使调用该方法的线程进入阻塞状态,并将其加入到阻塞队列中。
  • countDown()方法:线程调用countDown()方法,会将CountDownLatch中count的值-1。当count变量的值递减为0,会唤醒阻塞队列中调用await()方法的线程继续执行业务处理。

应用场景

CountDownLatch是一种非常实用的并发控制工具,它的主要应用场景:

  • 主线程等待多个子线程完成任务处理。如:主线程等待其他线程各自完成任务处理后,再继续执行。

  • 实现多个线程开始执行任务处理的最大并行性(注意:是并行而非并发)。如:多个线程需要在同一时刻开始执行任务处理,可以通过如下方式实现:

    1)初始化一个的CountDownLatch变量(计数器的初始化值为1)。

    2)需要在同一时刻执行任务处理的所有线程调用CountDownLatch.await()方法进入阻塞状态。

    3)主线程调用CountDownLatch.countDown()方法将计数器-1(此时计数器的值为0),唤醒所有调用CountDownLatch.await()方法进入阻塞状态的线程开始执行任务处理。

实现原理

CountDownLatch中定义了一个Sync类型的变量和操作该变量的方法。

源码如下:

// Sync类型的同步变量
private final Sync sync;
// 构造函数,用于初始化CountDownLatch计数器
public CountDownLatch(int count) {
   ...}
// 当前线程进入阻塞状态,直到AQS中的state(计数器)值为0,或者当前线程被其他线程中断。
public void await() throws InterruptedException {
   ...}
// 当前线程进入阻塞状态,直到AQS中的state(计数器)值为0,或者当前线程等待超时或者被其他线程中断。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
   ...}
// 递减AQS中的state(计数器)值,如果state的值递减为0,则唤醒调用await()方法进入阻塞的线程。
public void countDown() {
   ...}
// 返回state的值。
public long getCount() {
   ...}
// 返回标识CountDownLatch及其计数器值的字符串。
public String toString() {
   ...}

其中,最重要的是sync类型的变量、await()和countDown()方法。

Sync

Sync是CountDownLatch的静态内部类器,它继承了
AbstractQueuedSynchronizer(AQS),主要用于CountDownLatch的同步状态,创建CountDownLatch时进行初始化。

CountDownLatch构造函数:

public CountDownLatch(int count) {
   
    // 如果传入的count值小于0,则抛出IllegalArgumentException异常
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化Sync
    this.sync = new Sync(count);
}

初始化Sync时,将传入的count参数值赋值给AQS的同步状态state,state是一个volatile修饰的int值,一个线程修改了state值,其他线程能够立刻感知,从而保证state值在并发场景下的可见性。

同时,Sync实现了AQS的tryAcquireShared()和tryReleaseShared()方法:

java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
// 尝试获取共享资源
protected int tryAcquireShared(int acquires) {
   
    /**
     * 用于根据state(计数器)的值来尝试获取共享资源:
     *   state的值为0,返回1,表示可以获取共享资源。
     *   state的值不为0,返回-1,表示无法获取共享资源。
     */
    return (getState() == 0) ? 1 : -1;
}
// 尝试释放共享资源(AQS)
protected boolean tryReleaseShared(int releases) {
   
    // Decrement count; signal when transition to zero
    for (;;) {
   
        // 获取当前state的值
        int c = getState();
        // 如果state的值为0,则返回false(即:没有需要释放的资源)
        if (c == 0)
            return false;
        // 如果state的值大于0,则将state的值-1,并通过CAS的方式更新state的最新值
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            /** 
             * 返回资源释放结果:
             *   释放资源后state的值为0,则返回true,表示可以唤醒调用await()方法进入阻塞的线程。
             *   释放资源后state的值不为0,则返回false,表示继续阻塞调用await()方法的线程,直到state的值被减为0。
             */
            return nextc == 0;
    }
}

await方法

CountDownLatch通过CountDownLatch#await方法调用CountDownLatch.Sync#tryAcquireShared方法尝试获取共享资源:

  • 获取到共享资源,则唤醒调用await()方法的线程执行业务处理。
  • 获取不到共享资源,则继续阻塞调用await()方法的线程,直到state的值递减为0(即:其他线程释放完共享资源)。

CountDownLatch#await方法源码解析:

// java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
   
    // 获取共享资源(可中断)
    sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
// 获取共享资源(AQS)
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
   
    // 如果线程被其他线程中断,则抛出InterruptedException异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 具体由继承AQS的Sync的tryAcquireShared()方法实现
    if (tryAcquireShared(arg) < 0)
        // 如果获取共享资源锁失败,则将当前线程封装成Node节点追加到CLH队列的末尾,等待被唤醒(即:进入阻塞)
        doAcquireSharedInterruptibly(arg);
}

其中,
doAcquireSharedInterruptibly()方法源码解析请移步主页查阅->「一文搞懂」AQS(抽象队列同步器)实现原理及源码解析。

countDown方法

CountDownLatch通过CountDownLatch#countDown方法调用CountDownLatch.Sync#tryReleaseShared方法尝试释放共享资源:

  • 如果释放某个共享资源后state的值为0,则唤醒调用await()方法的线程执行业务处理。
  • 如果释放某个共享资源后state的值不为0,则继续阻塞调用await()方法的线程,直到state的值被减为0。

CountDownLatch#countDown方法源码解析:

// java.util.concurrent.CountDownLatch#countDown
public void countDown() {
   
    // 释放共享资源(数量为1)
    sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
// 释放共享资源(AQS)
public final boolean releaseShared(int arg) {
   
    // 具体由继承AQS的Sync的tryReleaseShared()方法实现
    if (tryReleaseShared(arg)) {
   
        // 唤醒后继节点
        doReleaseShared();
        return true;
    }
    return false;
}

使用示例

主线程等待子线程完成处理

/**
 * @author 南秋同学
 * 主线程等待多个子线程完成任务处理
 */
@Slf4j
public class CountDownLatchExample {
   
    @SneakyThrows
    public static void main(String[] args) {
   
        // 初始化一个的CountDownLatch变量(计数器的初始化值为5)
        CountDownLatch cdl = new CountDownLatch(5);
        // 初始化一个固定大小的线程池
        ExecutorService service = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5 ; i++){
   
            // 创建Runnable线程
            Runnable runnable = new Runnable() {
   
                @SneakyThrows
                @Override
                public void run() {
   
                    log.info("子线程-{}开始执行...", Thread.currentThread().getName());
                    Thread.sleep((long) (Math.random() * 10000));
                    log.info("子线程-{}执行完成", Thread.currentThread().getName());
                    cdl.countDown();
                }
            };
            service.execute(runnable);
        }
        log.info("主线程-{}等待所有子线程执行完成...",Thread.currentThread().getName());
        cdl.await();
        log.info("所有子线程执行完成,开始执行主线程-{}",Thread.currentThread().getName());
    }
}

执行结果:

14:13:01.299 [pool-1-thread-3]  - 子线程-pool-1-thread-3开始执行...
14:13:01.299 [pool-1-thread-2]  - 子线程-pool-1-thread-2开始执行...
14:13:01.299 [main]  - 主线程-main等待所有子线程执行完成...
14:13:01.299 [pool-1-thread-1]  - 子线程-pool-1-thread-1开始执行...
14:13:01.299 [pool-1-thread-5]  - 子线程-pool-1-thread-5开始执行...
14:13:01.299 [pool-1-thread-4]  - 子线程-pool-1-thread-4开始执行...
14:13:02.739 [pool-1-thread-3]  - 子线程-pool-1-thread-3执行完成
14:13:03.792 [pool-1-thread-1]  - 子线程-pool-1-thread-1执行完成
14:13:04.752 [pool-1-thread-5]  - 子线程-pool-1-thread-5执行完成
14:13:07.761 [pool-1-thread-4]  - 子线程-pool-1-thread-4执行完成
14:13:10.384 [pool-1-thread-2]  - 子线程-pool-1-thread-2执行完成
14:13:10.385 [main]  - 所有子线程执行完成,开始执行主线程-main

多线程最大并行处理

/**
 * @author 南秋同学
 * 实现多个线程开始执行任务处理的最大并行性
 */
@Slf4j
public class CountDownLatchExample {
   
    @SneakyThrows
    public static void main(String[] args) {
   
        // 初始化一个的CountDownLatch变量(计数器的初始化值为1)
        CountDownLatch referee = new CountDownLatch(1);
        // 初始化一个的CountDownLatch变量(计数器的初始化值为5)
        CountDownLatch sportsman = new CountDownLatch(5);
        // 初始化一个固定大小的线程池
        ExecutorService service = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5 ; i++){
   
            // 创建Runnable线程
            Runnable runnable = new Runnable() {
   
                @SneakyThrows
                @Override
                public void run() {
   
                    log.info("运动员-{},等待裁判发布开始口令", Thread.currentThread().getName());
                    referee.await();
                    log.info("运动员-{},收到裁判发布的开始口令,起跑...", Thread.currentThread().getName());
                    Thread.sleep((long) (Math.random() * 10000));
                    log.info("运动员-{}到达终点", Thread.currentThread().getName());
                    sportsman.countDown();
                }
            };
            service.execute(runnable);
        }
        log.info("裁判-{}准备发布开始口令...",Thread.currentThread().getName());
        Thread.sleep((long) (Math.random() * 10000));
        referee.countDown();
        log.info("裁判-{}已经发布开始口令,等待所有选手达到终点...",Thread.currentThread().getName());
        sportsman.await();
        log.info("所有运动员达到终点,裁判-{}开始计分",Thread.currentThread().getName());
    }
}

执行结果:

13:56:14.683 [pool-1-thread-3]  - 运动员-pool-1-thread-3,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-2]  - 运动员-pool-1-thread-2,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-5]  - 运动员-pool-1-thread-5,等待裁判发布开始口令
13:56:14.683 [main]  - 裁判-main准备发布开始口令...
13:56:14.683 [pool-1-thread-1]  - 运动员-pool-1-thread-1,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-4]  - 运动员-pool-1-thread-4,等待裁判发布开始口令
13:56:18.205 [main]  - 裁判-main已经发布开始口令,等待所有选手达到终点...
13:56:18.205 [pool-1-thread-2]  - 运动员-pool-1-thread-2,收到裁判发布的开始口令,起跑...
13:56:18.205 [pool-1-thread-3]  - 运动员-pool-1-thread-3,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-5]  - 运动员-pool-1-thread-5,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-4]  - 运动员-pool-1-thread-4,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-1]  - 运动员-pool-1-thread-1,收到裁判发布的开始口令,起跑...
13:56:22.110 [pool-1-thread-4]  - 运动员-pool-1-thread-4到达终点
13:56:23.866 [pool-1-thread-1]  - 运动员-pool-1-thread-1到达终点
13:56:26.803 [pool-1-thread-3]  - 运动员-pool-1-thread-3到达终点
13:56:28.019 [pool-1-thread-5]  - 运动员-pool-1-thread-5到达终点
13:56:28.178 [pool-1-thread-2]  - 运动员-pool-1-thread-2到达终点
13:56:28.179 [main]  - 所有运动员达到终点,裁判-main开始计分
相关文章
|
3月前
|
算法 关系型数据库 MySQL
【MySQL 解析】数据库的乐观锁和悲观锁实现原理
【1月更文挑战第11天】【MySQL 解析】数据库的乐观锁和悲观锁实现原理
|
1月前
|
存储 缓存 编译器
Go语言解析Tag:深入探究实现原理
【2月更文挑战第20天】
42 2
|
1月前
|
算法 Java 调度
Semaphore实现原理全面解析
Semaphore(信号量)是一个同步工具类,通过Semaphore可以控制同时访问共享资源的线程个数。
|
2月前
|
存储 安全 Java
Go Slice的底层实现原理深度解析
在Go语言的世界里,切片(Slice)是一种极其重要的数据结构,它以其灵活性和高效性在众多编程场景中扮演着核心角色。本文将深入探讨Go切片的底层实现原理,通过实例和源码分析,带你领略Go语言设计之美。
|
2月前
|
程序员 测试技术 Python
Python中的装饰器实现原理解析
在Python编程中,装饰器是一种强大的工具,通过装饰器可以在不改变函数源码的情况下增加额外的功能。本文将深入探讨Python中装饰器的实现原理,帮助读者更好地理解和运用装饰器。
|
3月前
|
NoSQL Redis
Redis进阶- Redisson分布式锁实现原理及源码解析
Redis进阶- Redisson分布式锁实现原理及源码解析
76 0
|
7月前
|
JSON 数据格式
SAP 电商云 Spartacus UI SiteContextParamsService 的实现原理解析
SAP 电商云 Spartacus UI SiteContextParamsService 的实现原理解析
57 0
|
9月前
|
存储 Java 数据安全/隐私保护
ThreadLocal的实现原理&源码解析
ThreadLocal是Java中的一个线程封闭机制,它提供了一种线程局部变量的解决方案,可以使每个线程都拥有自己独立的变量副本,互不干扰。
67 0
BXA
|
11月前
|
存储 弹性计算 Kubernetes
解析Kubernetes的设计与实现原理
Kubernetes 是一种用于自动化部署、扩展和管理容器化应用程序的开源平台。它通过提供跨主机集群的容器协调和管理服务,实现了高可用性和弹性伸缩的容器集群管理。
BXA
134 0
|
1天前
|
XML 人工智能 Java
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)

推荐镜像

更多