JAVA并发编程系列(9)CyclicBarrier循环屏障原理分析

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 本文介绍了拼多多面试中的模拟拼团问题,通过使用 `CyclicBarrier` 实现了多人拼团成功后提交订单并支付的功能。与之前的 `CountDownLatch` 方法不同,`CyclicBarrier` 能够确保所有线程到达屏障点后继续执行,并且屏障可重复使用。文章详细解析了 `CyclicBarrier` 的核心原理及使用方法,并通过代码示例展示了其工作流程。最后,文章还提供了 `CyclicBarrier` 的源码分析,帮助读者深入理解其实现机制。

拼多多2面,还是模拟拼团,要求用户拼团成功后,提交订单支付金额。

     之前我们在系列(8)《CountDownLatch核心原理》,实现过拼团场景。但是CountDownLatch里调用countDown()方法后,线程还是可以继续执行后面的代码,没有真正的阻塞。


1、面试真题:完善模拟拼团

      这里我们应用循环屏障CyclicBarrier,可以控制一组线程到达屏障点后,再全部继续执行,而且这个屏障可以重复利用的特性来实现这个场景。

现在我们模拟2人拼团成功的场景,每满2人就允许提交订单支付,且后台发送消息给仓库发货。


package lading.java.mutithread;
import cn.hutool.core.date.DateTime;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
 * 模拟拼团,并通知仓库发货
 */
public class Demo010CyclicBarrier {
    public static int count = 2;//要求达到屏障数量
    public static volatile ConcurrentHashMap<String, String> customerNames = new ConcurrentHashMap<>();//记录已到达屏障客户线程名
    public static CyclicBarrier barrier = new CyclicBarrier(count, new sendMsg());//屏障数量2,目标线程数量达到后,执行sendMsg
    public static void main(String[] args) {
        //模拟5个人拼团
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep((new Random().nextInt(10 - 1 + 1)) * 1000);//客户浏览商品信息Ns
                    System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") +" 【"+ Thread.currentThread().getName() + "】,到达屏障");
                    customerNames.put(Thread.currentThread().getName(), Thread.currentThread().getName());//本批次拼团名单
                    barrier.await();//到达屏障,进入阻塞
                    System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") +" 【"+ Thread.currentThread().getName() + "】,完成支付。");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }
            }, "客户00" + (i + 1)).start();
        }
    }
    static class sendMsg extends Thread {
        @Override
        public void run() {
            System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss") + customerNames.keySet() + "达到屏障拼团人数,允许提交订单!");
            customerNames.clear();//本批次拼团名单清空
        }
    }
}


客户2,3到达后,任务线程就发送信息给参考发货,同时客户2、3可以支持订单。当客户1到达后,就阻塞等待客户4拼团才继续执行。最后客户5,因为没有成团,一直阻塞。



2、说说CyclicBarrier的核心原理

     CyclicBarrier,顾名思义就是循环屏障。支持一组多个线程相互等待,线程调用了屏障的await()方法后,原地阻塞等待。当本组线程最后一个到达屏障后,本组其他线程全部被唤醒,继续执行屏障await()方法后面代码。

     由于这个屏障在释放完本组等待线程后,可以重复使用,等待下一组线程过来阻塞排队,因此称为:循环屏障。


3、具体说说CyclicBarrier怎么使用

    循环屏障也是很简单,核心方法就几个。首先第一个

1.CyclicBarrier(int parties, Runnable barrierAction)

        就是实例化一个循环屏障,parties就是本组线程目标数量。barrierAction就有意思了,这个是可选参数。如果本组线程都到达屏障后,就先执行这个Runnable barrierAction,阻塞等待的线程才能继续执行。可以从模拟拼团实例运行结果看到:线程2、3到达屏障后,先执行sendMsg的方法,线程2、3才可以开始支付。


2.await()

     线程调用这个方法后,表示已经到达屏障,该线程阻塞进入休眠状态,等本组其他线程都到达屏障点,才会被唤醒继续执行后面的代码。还有两个入参可选,await(long timeout, TimeUnit unit) 如果超出指定的等待时间,则抛出TimeoutException异常。


核心常用就这2个方法了,没别的!


4、CyclicBarrier源码分析

      首先,看一下CyclicBarrier的成员变量,里面int 有parties、和count。这两个变量成功支持了屏障变成循环屏障。其中parties表示屏障阈值,count表示当前还差多少个线程到达屏障,每来一个线程调用await(),本组线程的屏障count就减1.count为0时候,就唤醒本组线程继续执行。


private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private final int parties;
    private final Runnable barrierCommand;
    private Generation generation = new Generation();
    private int count;

     其次,看一下实例化循环屏障对象代码,重点将本组循环等待线程数量parties赋值给parties、count,以及设置屏障任务。


public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    //不指定屏障任务
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

然后,重点看一下await()阻塞等待的方法,里面调用了dowait()方法。

      源码很长,简单总结:dowait方法就是更新count-1,表示本组又报道了一个线程,还没到的名额少了一个。如果count为0,那说明自己是本组最后来屏障集合的线程,负责唤醒大家,以及执行屏障的barrierCommand任务(如果有的话)。

      源码细的说:除了count-1判断是否全部到齐,如果是0,包括如何唤醒其他线程。不是0,如何陷入阻塞等待。


具体就是:

1、如果count不是0,就把自己加入到AQS的条件队列里,等待信号唤醒。

2、如果count是0,说明本线程是最后一个到达的,咱不用进入阻塞,先执行屏障的barrierCommand,然后去唤醒本组的其他线程兄弟继续执行。并重置count值为parties阈值,方便下一组线程使用,达成屏障可循环使用的目的。

其他就是在AQS里如何阻塞等待、以及唤醒其他线程具体逻辑,源码有点复杂,等后续我们出源码分析专栏,就画图分析讲解。


private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //加锁,去更新count,这里就不是CAS了
        lock.lock();
        try {
            final Generation g = generation;
            //判断屏障是否被中断
            if (g.broken)
                throw new BrokenBarrierException();
            //判断本线程是否已中断
            if (Thread.interrupted()) {
                //本组线程的屏障中断,并重置屏障
                breakBarrier();
                throw new InterruptedException();
            }
            // 对count减1
            int index = --count;
            // 本组屏障全部线程都到达屏障,接下来执行屏障任务、以及唤醒本组其他阻塞兄弟
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //实例CyclicBarrier(),如果有指定任务,本线程就代劳去执行
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 如果不为0,说明约定到本屏障的其他兄弟们还没到齐,那就自旋等待,直到被打断或者超时
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } 
      ........
    }

今天就分享这么多,明天我们继续分享并发编程里的Condition条件接口。

相关文章
|
2天前
|
存储 Java 关系型数据库
高效连接之道:Java连接池原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。频繁创建和关闭连接会消耗大量资源,导致性能瓶颈。为此,Java连接池技术通过复用连接,实现高效、稳定的数据库连接管理。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接池的基本操作、配置和使用方法,以及在电商应用中的具体应用示例。
14 5
|
2天前
|
Java
Java中的多线程编程:从基础到实践
本文深入探讨Java多线程编程,首先介绍多线程的基本概念和重要性,接着详细讲解如何在Java中创建和管理线程,最后通过实例演示多线程的实际应用。文章旨在帮助读者理解多线程的核心原理,掌握基本的多线程操作,并能够在实际项目中灵活运用多线程技术。
|
2天前
|
Java 程序员 开发者
Java编程中的异常处理艺术
【10月更文挑战第24天】在Java的世界里,代码就像一场精心编排的舞蹈,每一个动作都要精准无误。但就像最完美的舞者也可能踩错一个步伐一样,我们的程序偶尔也会遇到意外——这就是所谓的异常。本文将带你走进Java的异常处理机制,从基本的try-catch语句到高级的异常链追踪,让你学会如何优雅地处理这些不请自来的“客人”。
|
2天前
|
设计模式 SQL 安全
Java编程中的单例模式深入解析
【10月更文挑战第24天】在软件工程中,单例模式是设计模式的一种,它确保一个类只有一个实例,并提供一个全局访问点。本文将探讨如何在Java中使用单例模式,并分析其优缺点以及适用场景。
6 0
|
2天前
|
存储 Java
在Java编程的世界里,标识符命名是一项基础且至关重要的技能
在Java编程的世界里,标识符命名是一项基础且至关重要的技能
7 0
|
3天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
70 38
|
5天前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
18 1
[Java]线程生命周期与线程通信
|
2天前
|
安全 Java
在 Java 中使用实现 Runnable 接口的方式创建线程
【10月更文挑战第22天】通过以上内容的介绍,相信你已经对在 Java 中如何使用实现 Runnable 接口的方式创建线程有了更深入的了解。在实际应用中,需要根据具体的需求和场景,合理选择线程创建方式,并注意线程安全、同步、通信等相关问题,以确保程序的正确性和稳定性。
|
3天前
|
监控 安全 Java
Java多线程编程的艺术与实践
【10月更文挑战第22天】 在现代软件开发中,多线程编程是一项不可或缺的技能。本文将深入探讨Java多线程编程的核心概念、常见问题以及最佳实践,帮助开发者掌握这一强大的工具。我们将从基础概念入手,逐步深入到高级主题,包括线程的创建与管理、同步机制、线程池的使用等。通过实际案例分析,本文旨在提供一种系统化的学习方法,使读者能够在实际项目中灵活运用多线程技术。