拼多多2面,还是模拟拼团,要求用户拼团成功后,提交订单支付金额。
之前我们在系列(8)《CountDownLatch核心原理》,实现过拼团场景。但是CountDownLatch里调用countDown()方法后,线程还是可以继续执行后面的代码,没有真正的阻塞。
1、面试真题:完善模拟拼团
这里我们应用循环屏障CyclicBarrier,可以控制一组线程到达屏障点后,再全部继续执行,而且这个屏障可以重复利用的特性来实现这个场景。
现在我们模拟2人拼团成功的场景,每满2人就允许提交订单支付,且后台发送消息给仓库发货。
package lading.java.mutithread; import cn.hutool.core.date.DateTime; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; /** * 模拟拼团,并通知仓库发货 */ public class Demo010CyclicBarrier { public static int count = 2;//要求达到屏障数量 public static volatile ConcurrentHashMap<String, String> customerNames = new ConcurrentHashMap<>();//记录已到达屏障客户线程名 public static CyclicBarrier barrier = new CyclicBarrier(count, new sendMsg());//屏障数量2,目标线程数量达到后,执行sendMsg public static void main(String[] args) { //模拟5个人拼团 for (int i = 0; i < 5; i++) { new Thread(() -> { try { Thread.sleep((new Random().nextInt(10 - 1 + 1)) * 1000);//客户浏览商品信息Ns System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") +" 【"+ Thread.currentThread().getName() + "】,到达屏障"); customerNames.put(Thread.currentThread().getName(), Thread.currentThread().getName());//本批次拼团名单 barrier.await();//到达屏障,进入阻塞 System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") +" 【"+ Thread.currentThread().getName() + "】,完成支付。"); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } }, "客户00" + (i + 1)).start(); } } static class sendMsg extends Thread { public void run() { System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss") + customerNames.keySet() + "达到屏障拼团人数,允许提交订单!"); customerNames.clear();//本批次拼团名单清空 } } }
客户2,3到达后,任务线程就发送信息给参考发货,同时客户2、3可以支持订单。当客户1到达后,就阻塞等待客户4拼团才继续执行。最后客户5,因为没有成团,一直阻塞。
2、说说CyclicBarrier的核心原理
CyclicBarrier,顾名思义就是循环屏障。支持一组多个线程相互等待,线程调用了屏障的await()方法后,原地阻塞等待。当本组线程最后一个到达屏障后,本组其他线程全部被唤醒,继续执行屏障await()方法后面代码。
由于这个屏障在释放完本组等待线程后,可以重复使用,等待下一组线程过来阻塞排队,因此称为:循环屏障。
3、具体说说CyclicBarrier怎么使用
循环屏障也是很简单,核心方法就几个。首先第一个
1.CyclicBarrier(int parties, Runnable barrierAction)
就是实例化一个循环屏障,parties就是本组线程目标数量。barrierAction就有意思了,这个是可选参数。如果本组线程都到达屏障后,就先执行这个Runnable barrierAction,阻塞等待的线程才能继续执行。可以从模拟拼团实例运行结果看到:线程2、3到达屏障后,先执行sendMsg的方法,线程2、3才可以开始支付。
2.await()
线程调用这个方法后,表示已经到达屏障,该线程阻塞进入休眠状态,等本组其他线程都到达屏障点,才会被唤醒继续执行后面的代码。还有两个入参可选,await(long timeout, TimeUnit unit) 如果超出指定的等待时间,则抛出TimeoutException异常。
核心常用就这2个方法了,没别的!
4、CyclicBarrier源码分析
首先,看一下CyclicBarrier的成员变量,里面int 有parties、和count。这两个变量成功支持了屏障变成循环屏障。其中parties表示屏障阈值,count表示当前还差多少个线程到达屏障,每来一个线程调用await(),本组线程的屏障count就减1.count为0时候,就唤醒本组线程继续执行。
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand; private Generation generation = new Generation(); private int count;
其次,看一下实例化循环屏障对象代码,重点将本组循环等待线程数量parties赋值给parties、count,以及设置屏障任务。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //不指定屏障任务 public CyclicBarrier(int parties) { this(parties, null); }
然后,重点看一下await()阻塞等待的方法,里面调用了dowait()方法。
源码很长,简单总结:dowait方法就是更新count-1,表示本组又报道了一个线程,还没到的名额少了一个。如果count为0,那说明自己是本组最后来屏障集合的线程,负责唤醒大家,以及执行屏障的barrierCommand任务(如果有的话)。
源码细的说:除了count-1判断是否全部到齐,如果是0,包括如何唤醒其他线程。不是0,如何陷入阻塞等待。
具体就是:
1、如果count不是0,就把自己加入到AQS的条件队列里,等待信号唤醒。
2、如果count是0,说明本线程是最后一个到达的,咱不用进入阻塞,先执行屏障的barrierCommand,然后去唤醒本组的其他线程兄弟继续执行。并重置count值为parties阈值,方便下一组线程使用,达成屏障可循环使用的目的。
其他就是在AQS里如何阻塞等待、以及唤醒其他线程具体逻辑,源码有点复杂,等后续我们出源码分析专栏,就画图分析讲解。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加锁,去更新count,这里就不是CAS了 lock.lock(); try { final Generation g = generation; //判断屏障是否被中断 if (g.broken) throw new BrokenBarrierException(); //判断本线程是否已中断 if (Thread.interrupted()) { //本组线程的屏障中断,并重置屏障 breakBarrier(); throw new InterruptedException(); } // 对count减1 int index = --count; // 本组屏障全部线程都到达屏障,接下来执行屏障任务、以及唤醒本组其他阻塞兄弟 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //实例CyclicBarrier(),如果有指定任务,本线程就代劳去执行 if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 如果不为0,说明约定到本屏障的其他兄弟们还没到齐,那就自旋等待,直到被打断或者超时 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } ........ }
今天就分享这么多,明天我们继续分享并发编程里的Condition条件接口。