【线程】并发流程控制的同步工具-CountDownLatch

简介: 通常我们都会利用并行来优化性能,但是对于串行化的业务,可能需要按顺序执行,那我们怎么才能处理呢?

前言

大家好,我是小郭,前面我们学习了利用Semaphore来防止多线程同时操作一个资源,通常我们都会利用并行来优化性能,但是对于串行化的业务,可能需要按顺序执行,那我们怎么才能处理呢?今天我们来学习另一个并发流程控制的同步工具CountDownLatch。

了解CountDownLatch

首先,CountDownLatch是一种并发流程控制的同步工具。

主要的作用是等待多个线程同时完成任务之后,再继续完成主线程任务。

简单点可以理解为,几个小伙伴一起到火锅店聚餐,人到齐了,火锅店才可以开饭。

思考问题:

  1. CountDownLatch 底层原理是什么,他是否可以代替wait / notify?
  2. CountDwonLatch 业务场景有哪些?
  3. 一次可以唤醒多个任务吗?

主要参数与方法

//减少锁存器的计数,如果计数达到零,则释放所有等待线程。 
//计数器 
public void countDown() { 
    sync.releaseShared(1); 
} 
//导致当前线程等待,直到锁存器递减至零为止,除非该线程被中断。 
//火锅店调用await的线程,count为0才能继续执行 
public void await() throws InterruptedException { 
    sync.acquireSharedInterruptibly(1); 
}

构造方法

//count 数量,理解为小伙伴的个数 
public CountDownLatch(int count) { 
    if (count < 0) throw new IllegalArgumentException("count < 0"); 
        this.sync = new Sync(count); 
} 
//获取剩余的数量 
public long getCount() { 
    return sync.getCount(); 
}

CountDownLatch底层实现原理

我们可以看出countDown()是CountDownLatch的核心方法,我来看下他的具体实现。

网络异常,图片无法展示
|

CountDownLatch来时继承AQS的共享模式来完成其的实现,从前面的学习得出AQS主要是依赖同步队列和state实现控制。

共享模式:

这里与独占锁大多数相同,自旋过程中的退出条件是是当前节点的前驱节点是头结点并且tryAcquireShared(arg)返回值大于等于0即能成功获得同步状态.

await

public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 
} 
//当状态不为0挂起,表示当前线程被占有,需要线程排队 
protected int tryAcquireShared(int acquires) { 
    return (getState() == 0) ? 1 : -1; 
} 
//在共享模式下获取 
doAcquireSharedInterruptibly(int arg)

countDown

public void countDown() { 
    sync.releaseShared(1); 
} 
protected boolean tryReleaseShared(int releases) { 
// Decrement count; signal when transition to zero 
//自旋防止失败 
    for (;;) { 
        //获取状态
        int c = getState(); 
        //状态为为0返回false,表示没有被线程占有 
        if (c == 0) return false; 
        //调用cas来进行替换,也保证了线程安全,当为0的时候唤醒 
        int nextc = c-1; 
        if (compareAndSetState(c, nextc)) 
            return nextc == 0; 
    } 
} 
//当任务数量为0,aqs的释放共享锁 
void doReleaseShared()
private void doReleaseShared() {
    /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
    // 无限循环
    for (;;) {
        // 保存头节点
        Node h = head;
        // 头节点不为空并且头节点不为尾结点
        if (h != null && h != tail) { 
            // 获取头节点的等待状态
            int ws = h.waitStatus; 
            if (ws == Node.SIGNAL) { 
                // 状态为SIGNAL,CAS更新状态
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                    continue;            // loop to recheck cases
                // 释放后继结点
                unparkSuccessor(h);
            }
            // 状态为0并且更新不成功,继续
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 
                continue;                // loop on failed CAS
        }
        if (h == head) // 若头节点改变,继续循环  
            break;
    }
}

思考

  1. 如何安排线程排序

个人认为,没有进行线程的排序,而是让一部分线程进入等待,在唤醒的时候放开。

执行流程图

网络异常,图片无法展示
|

实践

用法一:一个线程等待其他多个线程都执行完毕,再继续自己的工作

public class CountDownLatchTest {
    private static Lock lock = new ReentrantLock();
    private static CountDownLatch countDownLatch = new CountDownLatch(4);
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        IntStream.range(0,16).forEach(i ->{
            executorService.submit(()->{
                lock.lock();
                System.out.println(Thread.currentThread().getName()+ "来火锅店吃火锅!");
                try {
                    Thread.sleep(1000);
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + "我到火锅店了,准备开吃!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
        });
        try {
            countDownLatch.await(5,TimeUnit.SECONDS);
            System.out.println("人到齐了,开饭");
            executorService.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出结果

网络异常,图片无法展示
|

代码中设置了一个CountDownLatch做倒计时,四个人(count为4)一起到火锅店吃饭,每到一个人计数器就减去1(countDownLatch.countDown()),当计数器为0的时候,main线程在await的阻塞结束,继续往下执行。

用法二:多个线程等待某一个线程的信号,同时开始执行

用抢位子作为例子,将线程挂起等待,同时开始执行。

public class CountDownLatchTest2 {
    private static Lock lock = new ReentrantLock();
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        IntStream.range(0,4).forEach(i ->{
            executorService.submit(()->{
                System.out.println(Thread.currentThread().getName()+ "准备开始抢位子!");
                try {
                    //Thread.sleep(1000);
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + "抢到了位置");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        });
        try {
            Thread.sleep(5000);
            System.out.println("五秒后开始抢位置");
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }
}

注意点

  1. CountDownLatch是不能重用的。

总结

我们可以看到CountDownLatch的使用很简单,就当做一个计时器来使用,在控制并发方面能给我们提供帮助。

  1. 在构造器中初始化任务数量
  2. 调用await()挂起主线程main
  3. 调用countDown()方法减一,直到为0的时候,唤醒主线程可以继续运行。

上面提供的两个用法,我们也可以结合起来使用。

在实际的业务代码开发中,利用CountDownLatch来进行业务方法的执行,来确定他们的顺序,解决一个线程等待多个线程的场景

相关文章
|
1月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
152 0
|
22天前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
127 59
|
13天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
26 1
|
1月前
|
Java C++
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
33 0
|
2月前
|
消息中间件 存储 前端开发
面试官:说说停止线程池的执行流程?
面试官:说说停止线程池的执行流程?
51 2
面试官:说说停止线程池的执行流程?
|
2月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
2月前
|
数据采集 消息中间件 并行计算
进程、线程与协程:并发执行的三种重要概念与应用
进程、线程与协程:并发执行的三种重要概念与应用
57 0
|
2月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
3月前
|
Rust 并行计算 安全
揭秘Rust并发奇技!线程与消息传递背后的秘密,让程序性能飙升的终极奥义!
【8月更文挑战第31天】Rust 以其安全性和高性能著称,其并发模型在现代软件开发中至关重要。通过 `std::thread` 模块,Rust 支持高效的线程管理和数据共享,同时确保内存和线程安全。本文探讨 Rust 的线程与消息传递机制,并通过示例代码展示其应用。例如,使用 `Mutex` 实现线程同步,通过通道(channel)实现线程间安全通信。Rust 的并发模型结合了线程和消息传递的优势,确保了高效且安全的并行执行,适用于高性能和高并发场景。
59 0

相关实验场景

更多