Java Review - 并发编程_ CountDownLatch原理&源码剖析

简介: Java Review - 并发编程_ CountDownLatch原理&源码剖析

cba922bb0f354216abcff72d9dbe4d11.png195d03d17afc4a928bc581f313b01dfe.png

Pre


每日一博 - CountDownLatch使用场景分析以及源码分析


在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。


在CountDownLatch出现之前一般都使用线程的join()方法来实现这一点,但是join方法不够灵活,不能够满足不同场景的需要,所以JDK开发组提供了CountDownLatch这个类,使用CountDownLatch会更优雅.


小Demo

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/19 10:46
 * @mark: show me the code , change the world
 */
public class CountDownLatchTest {
    // 创建一个CountDownLatch实例
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " 模拟业务运行");
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 业务运行Over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 子线程执行结束,减1
                countDownLatch.countDown();
            }
        });
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " 模拟业务运行");
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 业务运行Over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 子线程执行结束,减1
                countDownLatch.countDown();
            }
        });
        // 等待子线程执行执行结束  返回
        countDownLatch.await();
        System.out.println( "子线程业务运行Over,主线程继续工作");
        executorService.shutdown();
    }
}


cba922bb0f354216abcff72d9dbe4d11.png


如上代码中,


创建了一个CountDownLatch实例,因为有两个子线程所以构造函数的传参为2。


主线程调用countDownLatch.await()方法后会被阻塞。


子线程执行完毕后调用countDownLatch.countDown()方法让countDownLatch内部的计数器减1


所有子线程执行完毕并调用countDown()方法后计数器会变为0,这时候主线程的await()方法才会返回。


CountDownLatch VS join方法


调用一个子线程的join()方法后,该线程会一直被阻塞直到子线程运行完毕


而CountDownLatch则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是CountDownLatch可以在子线程运行的任何时候让await方法返回而不一定必须等到线程结束


另外,使用线程池来管理线程时一般都是直接添加Runable到线程池,这时候就没有办法再调用线程的join方法了,就是说countDownLatch相比join方法让我们对线程同步有更灵活的控制


类图关系


0e720dd842224ce68a68d10c4ded0fbf.png


从类图可以看出,CountDownLatch是使用AQS实现的。

通过下面的构造函数, 实际上是把计数器的值赋给了AQS的状态变量state,也就是这里使用AQS的状态值来表示计数器值。


01d3d58595cb46238cff24b3faf45a89.png

 /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
 Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }


核心方法&源码解析


接下来分析CountDownLatch中的几个重要的方法,看它们是如何调用AQS来实现功能的。


void await()


当线程调用CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回


当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器的值为0时


其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException异常,然后返回

   public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await()方法委托sync调用了AQS的acquireSharedInterruptibly方法

  // AQS获取共享资源时响应中断的方法
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
         // 响应中断 
        if (Thread.interrupted())
            throw new InterruptedException();
       // 查看当前计数器是否为0 ,为0 直接返回,否则进入AQS队列等待  
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }


6c50096d58064cd08aec3f1719d04c53.png


86f95847a1804491aee2adffa5b26130.png

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }


由如上代码可知,该方法的特点是线程获取资源时可以被中断,并且获取的资源是共享资源。


acquireSharedInterruptibly首先判断当前线程是否已被中断,若是则抛出异常,否则调用sync实现的tryAcquireShared方法查看当前状态值(计数器值)是否为0,是则当前线程的await()方法直接返回,否则调用AQS的doAcquireSharedInterruptibly方法让当前线程阻塞。


另外可以看到,这里tryAcquireShared传递的arg参数没有被用到,调用tryAcquireShared的方法仅仅是为了检查当前状态值是不是为0,并没有调用CAS让当前状态值减1。


boolean await(long timeout, TimeUnit unit)


当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回


当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器值为0时,这时候会返回true


设置的timeout时间到了,因为超时而返回false


其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常,然后返回

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

void countDown()


线程调用该方法后,计数器的值递减,递减后如果计数器值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做

下面看下countDown()方法是如何调用AQS的方法的。


 /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
      // 委托调用AQS的releaseShared
        sync.releaseShared(1);
    }

AQS的方法

  /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
      // 调用syn实现的tryReleaseShared
        if (tryReleaseShared(arg)) {
          // AQS释放资源的方法
            doReleaseShared();
            return true;
        }
        return false;
    }


1545c4ca401c4e4bbb4ada8aedbe8b3f.png

在如上代码中,releaseShared首先调用了sync实现的AQS的tryReleaseShared方法,代码如下


980d6302f97542539533e9ab562f3524.png


       protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 循环进行CAS, 直到当前线程成功弯沉CAS使计数器值(状态值state)减一 并更新state
            for (;;) {
                int c = getState();
                // 1 如果状态值为0 ,则直接返回
                if (c == 0)
                    return false;
               // 2 使用CAS让计数器减1     
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

如上代码


首先获取当前状态值(计数器值)。


代码(1)判断如果当前状态值为0则直接返回false,从而countDown()方法直接返回


否则执行代码(2)使用CAS将计数器值减1,CAS失败则循环重试,否则如果当前计数器值为0则返回true,返回true说明是最后一个线程调用的countdown方法,那么该线程除了让计数器值减1外,还需要唤醒因调用CountDownLatch的await方法而被阻塞的线程,具体是调用AQS的doReleaseShared方法来激活阻塞的线程


这里代码(1)貌似是多余的,其实不然,之所以添加代码(1)是为了防止当计数器值为0后,其他线程又调用了countDown方法,如果没有代码(1),状态值就可能会变成负数。


long getCount()


获取当前计数器的值,也就是AQS的state的值,一般在测试时使用该方法

  /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }


在其内部还是调用了AQS的getState方法来获取state的值(计数器当前值)


小结


CountDownLatch是使用AQS实现的。使用AQS的状态变量state来存放计数器的值。


首先在初始化CountDownLatch时设置状态值(计数器值),当多个线程调用countdown方法时实际是原子性递减AQS的状态值。


当线程调用await方法后当前线程会被放入AQS的阻塞队列等待计数器为0再返回。其他线程调用countdown方法让计数器值递减1,当计数器值变为0时,当前线程还要调用AQS的doReleaseShared方法来激活由于调用await()方法而被阻塞的线程。

相关文章
|
1天前
|
缓存 Java 数据库
Java并发编程学习11-任务执行演示
【5月更文挑战第4天】本篇将结合任务执行和 Executor 框架的基础知识,演示一些不同版本的任务执行Demo,并且每个版本都实现了不同程度的并发性。
16 4
Java并发编程学习11-任务执行演示
|
1天前
|
Java
从源码出发:JAVA中对象的比较
从源码出发:JAVA中对象的比较
9 3
|
2天前
|
前端开发 Java 关系型数据库
Java医院绩效考核系统源码B/S架构+springboot三级公立医院绩效考核系统源码 医院综合绩效核算系统源码
作为医院用综合绩效核算系统,系统需要和his系统进行对接,按照设定周期,从his系统获取医院科室和医生、护士、其他人员工作量,对没有录入信息化系统的工作量,绩效考核系统设有手工录入功能(可以批量导入),对获取的数据系统按照设定的公式进行汇算,且设置审核机制,可以退回修正,系统功能强大,完全模拟医院实际绩效核算过程,且每步核算都可以进行调整和参数设置,能适应医院多种绩效核算方式。
18 2
|
2天前
|
缓存 Java 数据库
Java并发编程中的锁优化策略
【5月更文挑战第9天】 在高负载的多线程应用中,Java并发编程的高效性至关重要。本文将探讨几种常见的锁优化技术,旨在提高Java应用程序在并发环境下的性能。我们将从基本的synchronized关键字开始,逐步深入到更高效的Lock接口实现,以及Java 6引入的java.util.concurrent包中的高级工具类。文中还会介绍读写锁(ReadWriteLock)的概念和实现原理,并通过对比分析各自的优势和适用场景,为开发者提供实用的锁优化策略。
3 0
|
2天前
|
算法 安全 Java
深入探索Java中的并发编程:CAS机制的原理与应用
总之,CAS机制是一种用于并发编程的原子操作,它通过比较内存中的值和预期值来实现多线程下的数据同步和互斥,从而提供了高效的并发控制。它在Java中被广泛应用于实现线程安全的数据结构和算法。
16 0
|
2天前
|
存储 安全 算法
掌握Java并发编程:Lock、Condition与并发集合
掌握Java并发编程:Lock、Condition与并发集合
10 0
|
20小时前
|
安全 Java
【JAVA进阶篇教学】第六篇:Java线程中状态
【JAVA进阶篇教学】第六篇:Java线程中状态
|
1天前
|
Java
【Java多线程】面试常考 —— JUC(java.util.concurrent) 的常见类
【Java多线程】面试常考 —— JUC(java.util.concurrent) 的常见类
10 0
|
1天前
|
设计模式 消息中间件 安全
【Java多线程】关于多线程的一些案例 —— 单例模式中的饿汉模式和懒汉模式以及阻塞队列
【Java多线程】关于多线程的一些案例 —— 单例模式中的饿汉模式和懒汉模式以及阻塞队列
8 0
|
1天前
|
安全 Java 程序员
【Java多线程】面试常考——锁策略、synchronized的锁升级优化过程以及CAS(Compare and swap)
【Java多线程】面试常考——锁策略、synchronized的锁升级优化过程以及CAS(Compare and swap)
5 0