Java并发编程笔记之 CountDownLatch闭锁的源码分析

简介: JUC 中倒数计数器 CountDownLatch 的使用与原理分析,当需要等待多个线程执行完毕后在做一件事情时候 CountDownLatch 是比调用线程的 join 方法更好的选择,CountDownLatch 与 线程的 join 方法区别是什么? 日常开发中经常会遇到需要在主线程中开启多线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景,它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的初始值必须大于0。

JUC 中倒数计数器 CountDownLatch 的使用与原理分析,当需要等待多个线程执行完毕后在做一件事情时候 CountDownLatch 是比调用线程的 join 方法更好的选择,CountDownLatch 与 线程的 join 方法区别是什么?

日常开发中经常会遇到需要在主线程中开启多线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景,它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的初始值必须大于0。另外它还提供了一个countDown方法来操作计数器的值,每调用一次countDown方法计数器都会减1,直到计数器的值减为0时就代表条件已成熟,所有因调用await方法而阻塞的线程都会被唤醒。这就是CountDownLatch的内部机制,看起来很简单,无非就是阻塞一部分线程让其在达到某个条件之后再执行。但是CountDownLatch的应用场景却比较广泛,只要你脑洞够大利用它就可以玩出各种花样。最常见的一个应用场景是开启多个线程同时执行某个任务,等到所有任务都执行完再统计汇总结果。下图动态演示了闭锁阻塞线程的整个过程。

 


在CountDownLatch出现之前一般都是使用线程的join()方法来实现,但是join不够灵活,不能够满足不同场景的需求。接下来我们看看CountDownLatch的原理实现。

 

一.CountDownLatch原理探究

  从CountDownLatch的名字可以猜测内部应该有个计数器,并且这个计数器是递减的,下面就通过源码看看JDK开发组是何时初始化计数器,何时递减的,计数器变为 0 的时候做了什么操作,多个线程是如何通过计时器值实现同步的,首先我们先看看CountDownLatch内部结构,类图如下:

从类图可以知道CountDownLatch内部还是使用AQS实现的,通过下面构造函数初始化计数器的值,可知实际上是把计数器的值赋值给了AQS的state,也就是这里AQS的状态值来表示计数器值。

构造函数源码如下:


public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

   Sync(int count) {
       setState(count);
   }


接下来主要看一下CountDownLatch中几个重要的方法内部是如何调用AQS来实现功能的。

  1.void await()方法,当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:(1)当所有线程都调用了CountDownLatch对象的countDown方法后,

也就是说计时器值为 0 的时候。(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。接下来让我们看看await()方法内部是如何调用

AQS的方法的,源码如下:


//CountDownLatch的await()方法
public void await() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}
    //AQS的获取共享资源时候可被中断的方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    //如果线程被中断则抛异常
    if (Thread.interrupted())
         throw new InterruptedException();
        //尝试看当前是否计数值为0,为0则直接返回,否者进入AQS的队列等待
    if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
}

 //sync类实现的AQS的接口
 protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
 }


  从上面代码可以看到await()方法委托sync调用了AQS的acquireSharedInterruptibly方法,该方法的特点是线程获取资源的时候可以被中断,并且获取到的资源是共享资源,这里为什么要调用AQS的这个方法,而不是调用独占锁的accquireInterruptibly方法呢?这是因为这里状态值需要的并不是非 0 即 1 的效果,而是和初始化时候指定的计数器值有关系,比如你初始化的时候计数器值为 8 ,那么state的值应该就有 0 到 8 的状态,而不是只有  0  和  1 的独占效果。

  这里await()方法调用acquireSharedInterruptibly的时候传递的是 1 ,就是说明要获取一个资源,而这里计数器值是资源总数,也就是意味着是让总的资源数减 1 ,acquireSharedInterruptibly内部首先判断如果当前线程被中断了则抛出异常,否则调用sync实现的tryAcquireShared方法看当前状态值(计数器值)是否为 0  ,是则当前线程的await()方法直接返回,否则调用AQS的doAcquireSharedInterruptibly让当前线程阻塞。另外调用tryAcquireShared的方法仅仅是检查当前状态值是不是为 0 ,并没有调用CAS让当前状态值减去 1 。

 

  2.boolean await(long timeout, TimeUnit unit),当线程调用了 CountDownLatch 对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回: (1)当所有线程都调用了 CountDownLatch 对象的 countDown 方法后,也就是计时器值为 0 的时候,这时候返回 true; (2) 设置的 timeout 时间到了,因为超时而返回 false; (3)其它线程调用了当前线程的 interrupt()方法中断了当前线程,当前线程会抛出 InterruptedException 异常后返回。源码如下:


public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}


 

  3.void countDown() 当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为 0 则会唤醒所有调用await 方法而被阻塞的线程,否则什么都不做,接下来看一下countDown()方法内部是如何调用AQS的方法的,源码如下:


//CountDownLatch的countDown()方法
    public void countDown() {
       //委托sync调用AQS的方法
        sync.releaseShared(1);
    }
   //AQS的方法
    public final boolean releaseShared(int arg) {
        //调用sync实现的tryReleaseShared
        if (tryReleaseShared(arg)) {
            //AQS的释放资源方法
            doReleaseShared();
            return true;
        }
        return false;
    }


如上面代码可以知道CountDownLatch的countDown()方法是委托sync调用了AQS的releaseShared方法,后者调用了sync 实现的AQS的tryReleaseShared,源码如下:


//syn的方法
protected boolean tryReleaseShared(int releases) {
  //循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一并更新到state
  for (;;) {
      int c = getState();

      //如果当前状态值为0则直接返回(1)
      if (c == 0)
          return false;

      //CAS设置计数值减一(2)
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
          return nextc == 0;
  }
}


如上代码可以看到首先获取当前状态值(计数器值),代码(1)如果当前状态值为 0 则直接返回 false ,则countDown()方法直接返回;否则执行代码(2)使用CAS设置计数器减一,CAS失败则循环重试,否则如果当前计数器为 0 则返回 true 。返回 true 后,说明当前线程是最后一个调用countDown()方法的线程,那么该线程除了让计数器减一外,还需要唤醒调用CountDownLatch的await 方法而被阻塞的线程。这里的代码(1)貌似是多余的,其实不然,之所以添加代码 (1) 是为了防止计数器值为 0 后,其他线程又调用了countDown方法,如果没有代码(1),状态值就会变成负数。

 

  4.long getCount() 获取当前计数器的值,也就是 AQS 的 state 的值,一般在 debug 测试时候使用,源码如下:


public long getCount() {
     return sync.getCount();
}

int getCount() {
     return getState();
}


如上代码可知内部还是调用了 AQS 的 getState 方法来获取 state 的值(计数器当前值)。

 

到目前为止原理理解的差不多了,接下来用一个例子进行讲解CountDownLatch的用法,例子如下:


package com.hjc;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by cong on 2018/7/6.
 */
public class CountDownLatchTest {

    private static AtomicInteger id = new AtomicInteger();

    // 创建一个CountDownLatch实例,管理计数为ThreadNum
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {

        Thread threadOne = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
                countDownLatch.countDown();
            }
        });

        Thread threadTwo = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
                countDownLatch.countDown();

            }
        });

        Thread threadThree = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
                countDownLatch.countDown();

            }
        });

        // 启动子线程
        threadOne.start();
        threadTwo.start();
        threadThree.start();
        System.out.println("等待斗地主玩家进场");

        // 等待子线程执行完毕,返回
        countDownLatch.await();

        System.out.println("斗地主玩家已经满人,开始发牌.....");

    }
}


运行结果如下:

 


如上代码,创建了一个 CountDownLatch 实例,因为有两个子线程所以构造函数参数传递为 3,主线程调用 countDownLatch.await()方法后会被阻塞。子线程执行完毕后调用 countDownLatch.countDown() 方法让 countDownLatch 内部的计数器减一,等所有子线程执行完毕调用 countDown()后计数器会变为 0,这时候主线程的 await()才会返回。

 

如果把上面的代码中Thread.sleep和countDownLatch.await()的代码注释掉,运行几遍,运行结果就可能会出现如下结果,如下图:

 可以看到在注释掉latch.await()这行之后,就不能保证在所有玩家入场后才开始发牌了。


总结:CountDownLatch 与 join 方法的区别,一个区别是调用一个子线程的 join()方法后,该线程会一直被阻塞直到该线程运行完毕,而 CountDownLatch 则使用计数器允许子线程运行完毕或者运行中时候递减计数,也就是 CountDownLatch 可以在子线程运行任何时候让 await 方法返回而不一定必须等到线程结束;另外使用线程池来管理线程时候一般都是直接添加 Runable 到线程池这时候就没有办法在调用线程的 join 方法了,countDownLatch 相比 Join 方法让我们对线程同步有更灵活的控制。


目录
相关文章
|
26天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
30天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
64 12
|
27天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
146 2
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
2月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
2月前
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
40 1
|
2月前
|
Java 数据处理 开发者
Java多线程编程的艺术:从入门到精通####
【10月更文挑战第21天】 本文将深入探讨Java多线程编程的核心概念,通过生动实例和实用技巧,引导读者从基础认知迈向高效并发编程的殿堂。我们将一起揭开线程管理的神秘面纱,掌握同步机制的精髓,并学习如何在实际项目中灵活运用这些知识,以提升应用性能与响应速度。 ####
57 3
|
3月前
|
Java
Java中的多线程编程:从入门到精通
本文将带你深入了解Java中的多线程编程。我们将从基础概念开始,逐步深入探讨线程的创建、启动、同步和通信等关键知识点。通过阅读本文,你将能够掌握Java多线程编程的基本技能,为进一步学习和应用打下坚实的基础。
|
5月前
|
算法 Java 开发者
Java 编程入门:从零到一的旅程
本文将带领读者开启Java编程之旅,从最基础的语法入手,逐步深入到面向对象的核心概念。通过实例代码演示,我们将一起探索如何定义类和对象、实现继承与多态,并解决常见的编程挑战。无论你是编程新手还是希望巩固基础的开发者,这篇文章都将为你提供有价值的指导和灵感。
|
5月前
|
机器学习/深度学习 Java TensorFlow
深度学习中的图像识别:从理论到实践Java中的多线程编程入门指南
【8月更文挑战第29天】本文将深入探讨深度学习在图像识别领域的应用,从基础理论到实际应用案例,带领读者一步步理解如何利用深度学习技术进行图像识别。我们将通过一个简单的代码示例,展示如何使用Python和TensorFlow库实现一个基本的图像识别模型。无论你是初学者还是有一定经验的开发者,都能从中获得启发和学习。 【8月更文挑战第29天】在Java世界里,线程是程序执行的最小单元,而多线程则是提高程序效率和响应性的关键武器。本文将深入浅出地引导你理解Java多线程的核心概念、创建方法以及同步机制,帮助你解锁并发编程的大门。