【Java基础】线程同步类 CountDownLatch

简介: CountDownLatch是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。CountDownLatch 基于AQS构建同步器:AQS - AbstractQueuedSynchronizer ,即抽象的队列同步器,是一种用来**构建锁和同步器**的框架。

关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。
专注于分享各领域原创系列文章 ,擅长java后端、移动开发、人工智能等,希望大家多多支持。

正好今天项目中用到了CountDownLatch,那我们正好总结一下,通过本文你可以学到什么是CountDownLatch及其原理,CountDownLatch的使用场景等。

在阅读本文前,希望你有以下知识储备:

  1. AQS - AbstractQueuedSynchronizer(待更新)
  2. CAS(Compare And Swap)(待更新)
  3. 【Java基础】volatile关键字

一、导读

我们继续总结学习Java基础知识,温故知新。

二、概览

CountDownLatch是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。

CountDownLatch 基于AQS构建同步器:
AQS - AbstractQueuedSynchronizer ,即抽象的队列同步器,是一种用来构建锁和同步器的框架。

使用 LockSupport.park(this); 挂起线程,最后会调用到 UnSafe的park,然后调用到native层。

注意:计数器值递减到0的时候,不能再复原。

2.1 作用

构建锁和同步器,进行线程的等待。

2.2 使用场景

CountDownLatch适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景。

CountDownLatch可以理解为并发计数器,主要的使用场景是当一个任务被拆分成多个子任务时,需要等待子任务全部完成后再操作,不然会阻塞线程(当前线程),每完成一个任务计数器会 -1,直到没有。

一般用作多线程倒计时计数器,强制它们等待其他一组任务,计数器的减法是一个不可逆的过程。

eg:人齐了再吃饭,人不齐不能动筷子,大家都坐那等着。

 开会等人,人没到齐的时候,大家也都坐那等着。
 启动顺序,启动顺序的先后及依赖关系。

我们来一段代码:

public class Waitress implements Runnable {
    private CountDownLatch latch;
    private String name;

    public Waitress(CountDownLatch latch, String name) {
        this.latch = latch;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
            System.out.println(sdf.format(new Date()) + " " + name  + "等待顾客");
            latch.await();
            System.out.println(sdf.format(new Date()) + " " + name  + "开始上菜");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

顾客类

public class Customer implements Runnable {
    // 主线程 countdown
    private CountDownLatch mainThreadLatch;

    // 当前线程 countdown
    private CountDownLatch latchThread;
    private String name;

    public Customer(CountDownLatch mainThreadLatch, String name) {
        this.mainThreadLatch = mainThreadLatch;
        this.name = name;
        latchThread = new CountDownLatch(1);
        try {
            // 阻塞当前线程 1s
            latchThread.await(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
            Random random = new Random();

            System.out.println(sdf.format(new Date()) + " " + name + "出发去饭店");
            Thread.sleep((long) (random.nextDouble() * 3000) + 1000);
            System.out.println(sdf.format(new Date()) + " " + name + "到了饭店");
            mainThreadLatch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试代码

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(3);

    List<Thread> threads = new ArrayList<>(3);
    threads.add(new Thread(new Customer(latch, "张三")));
    threads.add(new Thread(new Customer(latch, "李四")));
    threads.add(new Thread(new Customer(latch, "王五")));
    for (Thread thread : threads) {
        thread.start();
    }

    Thread.sleep(100);
    new Thread(new Waitress(latch, "♥小芳♥")).start();
}

2.3 优劣

  • 优点 :代码优雅,不需要对线程池进行操作,将线程池作为 Bean 的情况下有很好的使用场景。
  • 缺点 :需要提前知道线程数量;性能不强。还需要在线程代码块内加上异常判断,否则在 countDown 之前发生异常而没有处理,就会导致主线程永远阻塞在 await。

CountDownLatch 相比于 join() 方法,提供了更多的API,更加灵活。

三、原理

3.1 原理

CountDownLatch就是利用了 AQS - AbstractQueuedSynchronizer 的共享锁来实现。
同时使用 CAS(Compare And Swap) 对计数器进行操作。
再使用 volatile 修饰的变量state维持倒数状态,使多线程共享变量可见。

AQS的数据结构核心就是两个虚拟队列: 同步队列sync queue 和条件队列condition queue,不同的条件会有不同的条件队列。

我们看下源码:

// 1、 创建CountDownLatch并设置计数器值,count代表计数器个数
//(内部是共享锁,本质就是上了几次锁)
public CountDownLatch(int count)

// 2、启动多线程并且调用CountDownLatch实例的countDown()方法
// 每countDown一次,计数器就减一,就是释放了一次共享锁,直到为0全部结束
//调用countDown的线程可以继续执行,不需要等待计数器被减到0,
//只是调用await方法的线程需要等待。
public void countDown()

// 3、 主线程调用 await() 方法,这样主线程的操作就会在这个方法上阻塞,
//直到count值为0,停止阻塞,主线程继续执行
// 在AQS队列里一直等待,直到countDown减到0,才会继续往下执行,
// 使用 LockSupport.park(this);  挂起线程
//方法在倒计数为0之前会阻塞 = 当前线程 =
public void await()

// 等待一定时间
public void await(long timeout, TimeUnit unit)

子类的任务有:

  1. 通过CAS操作维护共享变量state。
  2. 重写资源的获取方式。
  3. 重写资源释放的方式。

CountDownLatch有一个内部类叫做Sync(sɪŋk),它继承了AbstractQueuedSynchronizer类,其中维护了一个整数 state,并且保证了修改state的可见性和原子性。
在创建CountDownLatch实例时,也会创建一个Sync的实例,同时把计数器的值传给Sync实例,具体是这样的:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在 countDown 方法中,只调用了Sync实例的 releaseShared方法,具体是这样的:

public void countDown() {
    sync.releaseShared(1);
}

其中的 releaseShared 方法,先对计数器进行减1操作,如果减1后的计数器为0,唤醒被 await 方法阻塞的所有线程,具体是这样的:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //对计数器进行减一操作
        doReleaseShared();//如果计数器为0,唤醒被await方法阻塞的所有线程
        return true;
    }
    return false;
}

其中的tryReleaseShared方法,先获取当前计数器的值,如果计数器为0时,就直接返回;如果不为0时,使用 CAS(Compare And Swap) 方法对计数器进行减1操作,具体是这样的:

protected boolean tryReleaseShared(int releases) {
    for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。  自旋不断判断是否释放了state同步锁
        int c = getState();//获取当前计数器的值。
        if (c == 0)// 计数器为0时,就直接返回。说明之前已经释放了同步锁,这时候就不需要做任何操作了,因为之前已经做完了
            return false;
            
        // state - 1释放一次同步锁
        int nextc = c-1;
        
        // 通过CAS设置state同步状态,如果成功判断state是否为0,为0代表锁全部释放
        // 被await的程序可以继续往下执行了
        if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
            return nextc == 0;//如果操作成功,返回计数器是否为0
    }
}

3.2 CountDownLatch 是如何阻塞线程的

线程阻塞原理,在await方法中,调用了Sync实例的acquireSharedInterruptibly方法:

// 创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

其中acquireSharedInterruptibly方法,判断计数器是否为0,如果不为0则阻塞当前线程,具体是这样的:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//判断计数器是否为0
        doAcquireSharedInterruptibly(arg);//如果不为0 则阻塞当前线程
}


private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 以共享模式添加到等待队列 ,新建节点加入阻塞队列   
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 返回前一个节点
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg); //返回锁的state

                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    failed = false;
                    return;
                }
            }
            // 检查并更新未能获取的节点的状态。如果线程应该阻塞,则返回 true
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 失败就取消
        if (failed)
            cancelAcquire(node);
    }
}

// 挂起线程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

其中tryAcquireShared方法,是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在Sync类中,其主要是判断计数器是否为零,如果为零则返回1,如果不为零则返回-1,具体是这样的:

 // await判断共享锁是否全部释放,是则从队列中移除,继续往下执行,实现AQS的模板方法
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

相关文章
|
9天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
49 17
|
19天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
5天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
21天前
|
JSON Java Apache
Java基础-常用API-Object类
继承是面向对象编程的重要特性,允许从已有类派生新类。Java采用单继承机制,默认所有类继承自Object类。Object类提供了多个常用方法,如`clone()`用于复制对象,`equals()`判断对象是否相等,`hashCode()`计算哈希码,`toString()`返回对象的字符串表示,`wait()`、`notify()`和`notifyAll()`用于线程同步,`finalize()`在对象被垃圾回收时调用。掌握这些方法有助于更好地理解和使用Java中的对象行为。
|
21天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
21天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
22天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
45 3
|
22天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
119 2
|
30天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
48 6
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####