JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 上

简介: JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 上



一、CountDownLatch:

1、什么是 CountDownLatch:

CountDownLatch,闭锁,就是一个基于 AQS 共享模式的同步计数器,它内部的方法都是围绕 AQS 实现的。主要作用是使一个或一组线程在其他线程执行完毕之前,一直处于等待状态,直到其他线程执行完成后再继续执行。

CountDownLatch 利用 AQS 的 state 变量充当计数器(由 volatile 修饰并使用 CAS 进行更新的),计数器的初始值就是线程的数量,每当一个线程执行完成,计数器的值就会减一,当计数器的值为 0 时,表示所有的线程都已经完成任务了,那么接下来就唤醒在 CountDownLatch 上等待的线程执行后面的任务。

那么当计数器的值为 0 时,主线程是如何被唤醒的呢?这就要从 CountDownLatch 的工作流程来说明了,CountDownLatch 的工作流程可以看成在一开始只在 CLH 队列中放入一个主线程,然后不停的唤醒,唤醒之后如果发现 state 还是不为0,则继续等待。而主线程什么时候会被唤醒呢?

当每个子线程执行完毕的时候,会调用 countDown() 并基于 CAS 将计数器 state 的值减一,减一成功释放资源后,就会调用 unparkSuccessor() 唤醒主线程,当所有的子线程都执行完了,也就是 state 为 0 时,这时候主线程被唤醒之后就可以继续执行了。

state 被减成了 0 之后,就无法继续使用这个 CountDownLatch 了,需要重新 new 一个,因为 state 的数量只有在初始化 CountDownLatch 的时候才可以设置,这也是 CountDownLatch 不可重用的原因。

2、CountDownLatch 的源码简单说明:

从代码层面上来看,CountDownLatch 基于内部类 Sync 实现,而 Sync 继承自 AQS。CountDownLatch 最主要有两个方法:await()countDown()

  • await(): 调用该方法的线程会被挂起,直到 CountDownLatch 计数器的值为 0 才继续执行,底层使用的是 AQS 的 tryAcquireShared()
  • countDown(): 用于减少计数器的数量,如果计数减为 0 的话,就会唤醒主线程,底层使用的是 AQS 的 releaseShared()

countDown() 方法详细流程:

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

二、CyclicBarrier:

1、什么是CyclicBarrier:

CyclicBarrier,循环栅栏,通过 CyclicBarrier 可以实现一组线程之间的相互等待,当所有线程都到达屏障点之后再执行后续的操作。通过 await() 方法可以实现等待,当最后一个线程执行完,会使得所有在相应 CyclicBarrier 实例上的等待的线程被唤醒,而最后一个线程自身不会被暂停。

CyclicBarrier 没有像 CountDownLatchReentrantLock 使用 AQS 的 state 变量,它是直接借助 ReentrantLock 加上 Condition 等待唤醒的功能进而实现的。在构建 CyclicBarrier 的时候,传入的值会赋值给 CyclicBarrier 内部维护的变量 count,同时也会赋值给 parties 变量(这是可以复用的关键)。

线程调用 await() 表示线程已经到达栅栏,每次调用 await() 时,会将 count 减一,操作 count 值是直接使用 ReentrantLock 来保证线程安全性的,如果 count 不为 0,则添加到 condition 队列中,如果 count 等于 0,则把节点从 condition 队列中移除并添加到 AQS 队列中进行全部唤醒,并且将 parties 的值重新赋值给 count 从而实现复用。

2、CyclicBarrier 的源码分析:

(1)成员变量:

//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
//静态内部类Generation
private static class Generation {
  boolean broken = false;
}

CyclicBarrier 是通过独占锁实现的,底层包含了 “ReentrantLock 对象 lock” 和 “Condition 对象 trip”,通过条件队列 trip 来对线程进行阻塞的,并且其内部维护了两个 int 型的变量 parties 和 count:

  • parties 表示每次拦截的线程数,该值在构造时进行赋值,用于实现 CyclicBarrier 的复用;
  • count 是内部计数器,它的初始值和 parties 相同,以后随着每次 await 方法的调用而减 1,直到减为 0 就将所有线程唤醒。

CyclicBarrier 有一个静态内部类 Generation,该类的对象代表栅栏的当前代,利用它可以实现循环等待,当 count 减为 0 会将所有阻塞的线程唤醒,并设置成下一代。

barrierCommand 表示换代前执行的任务,在唤醒所有线程前可以通过 barrierCommand 来执行指定的任务

(2)await() 方法:

CyclicBarrier 类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。

//非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}
//定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时。

可以看到不管是定时等待还是非定时等待,它们都调用了 dowait() 方法,只不过是传入的参数不同而已,下面我们就来看看 dowait() 方法都做了些什么。

//核心等待方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  // 显示锁
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    final Generation g = generation;
    //检查当前栅栏是否被打翻
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    //检查当前线程是否被中断
    if (Thread.interrupted()) {
      //如果当前线程被中断会做以下三件事
      //1.打翻当前栅栏
      //2.唤醒拦截的所有线程
      //3.抛出中断异常
      breakBarrier();
      throw new InterruptedException();
    }
    //每次都将计数器的值减1
    int index = --count;
    //计数器的值减为0则需唤醒所有线程并转换到下一代
    if (index == 0) {
      boolean ranAction = false;
      try {
        //唤醒所有线程前先执行指定的任务
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        //唤醒所有线程并转到下一代
        nextGeneration();
        return 0;
      } finally {
        //确保在任务未成功执行时能将所有线程唤醒
        if (!ranAction) {
          breakBarrier();
        }
      }
    }
    //如果计数器不为0则执行此循环
    for (;;) {
      try {
        //根据传入的参数来决定是定时等待还是非定时等待
        if (!timed) {
          trip.await();
        }else if (nanos > 0L) {
          nanos = trip.awaitNanos(nanos);
        }
      } catch (InterruptedException ie) {
        //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
        if (g == generation && ! g.broken) {
          breakBarrier();
          throw ie;
        } else {
          //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
          Thread.currentThread().interrupt();
        }
      }
      //如果线程因为打翻栅栏操作而被唤醒则抛出异常
      if (g.broken) {
        throw new BrokenBarrierException();
      }
      //如果线程因为换代操作而被唤醒则返回计数器的值
      if (g != generation) {
        return index;
      }
      //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
      if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
      }
    }
  } finally {
    lock.unlock();
  }
}

上面执行的代码相对比较容易看懂,我们再来看一下执行流程:

  • 执行 dowait() 方法时,先获得显示锁,判断当前线程状态是否被中断,如果是,则执行 breakBarrier() 方法,唤醒之前阻塞的所有线程,并将计数器重置,否则,往下执行;
  • 计数器 count 减 1,如果 count == 0,表示最后一个线程达到栅栏,接着执行之前指定的 Runnable 接口,同时执行 nextGeneration() 方法进入下一代;
  • 否则,进入自旋,判断当前线程是进入定时等待还是非定时等待,如果在等待过程中被中断,执行 breakBarrier() 方法,唤醒之前阻塞的所有线程;
  • 判断是否是因为执行 breakBarrier() 方法而被唤醒,如果是,则抛出异常;
  • 判断是否是正常的换代操作而被唤醒,如果是,则返回计数器的值;
  • 判断是否是超时而被唤醒,如果是,则唤醒之前阻塞的所有线程,并抛出异常;
  • 释放锁。
相关文章
|
10天前
|
编解码 数据安全/隐私保护 计算机视觉
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
如何使用OpenCV进行同步和异步操作来打开海康摄像头,并提供了相关的代码示例。
31 1
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
|
17天前
|
Java C++
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
26 0
|
1月前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
6天前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
1月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
117 6
【Java学习】多线程&JUC万字超详解
|
8天前
多线程通信和同步的方式有哪些?
【10月更文挑战第6天】
48 0
|
2月前
|
开发者 C# UED
WPF与多媒体:解锁音频视频播放新姿势——从界面设计到代码实践,全方位教你如何在WPF应用中集成流畅的多媒体功能
【8月更文挑战第31天】本文以随笔形式介绍了如何在WPF应用中集成音频和视频播放功能。通过使用MediaElement控件,开发者能轻松创建多媒体应用程序。文章详细展示了从创建WPF项目到设计UI及实现媒体控制逻辑的过程,并提供了完整的示例代码。此外,还介绍了如何添加进度条等额外功能以增强用户体验。希望本文能为WPF开发者提供实用的技术指导与灵感。
102 0
|
2月前
|
开发者 C# 存储
WPF开发者必读:资源字典应用秘籍,轻松实现样式与模板共享,让你的WPF应用更上一层楼!
【8月更文挑战第31天】在WPF开发中,资源字典是一种强大的工具,用于共享样式、模板、图像等资源,提高了应用的可维护性和可扩展性。本文介绍了资源字典的基础知识、创建方法及最佳实践,并通过示例展示了如何在项目中有效利用资源字典,实现资源的重用和动态绑定。
56 0
|
2月前
|
Java API 调度
JUC线程池: FutureTask详解
总而言之,FutureTask是Java并发编程中一个非常实用的类,它在异步任务执行及结果处理方面提供了优雅的解决方案。在实现细节方面可以搭配线程池的使用,以及与Callable接口的配合使用,来完成高效的并发任务执行和结果处理。
32 0
|
13天前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
32 1
C++ 多线程之初识多线程