1、带着BAT大厂的面试问题去理解
请带着这些问题继续后文,会很大程度上帮助你更好的理解相关知识点。
- 什么是CountDownLatch? 闭锁,异步转同步
- CountDownLatch底层实现原理? AQS + 标志位共享锁
- CountDownLatch一次可以唤醒几个任务? 多个
- CountDownLatch有哪些主要方法? await()、countDown()
- CountDownLatch适用于什么场景? 异步转同步、多线程协作
- 写一道题:实现一个容器,提供两个方法,add,size 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束? 使用CountDownLatch 代替wait notify 好处。
2、CountDownLatch介绍
2.1、概念
在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行,(异步转同步)
- 可以用于统计多线程执行的时间。
- 可被多个线程并发的实现减1操作,并在计数器为0后调用await方法的线程被唤醒,从而实现多线程间的协作
2.2、可以实现的需求
- 1、解决业务上的问题:现需要解析一个excel里的多个sheet数据,使用多线程,每个线程解析其中一个sheet的数据,等到所有sheet解析完,程序提示解析完成构造函数传入int型参数做改为计数器,countDown被调用,计数器减1,await会一直阻塞程序,直至计数为0. 如果某个sheet解析较慢,可以使用带时间参数的await方法,到时间后,不再阻塞当前线程。
- 2、实现多线程的协作
2.3、分析CountDownLatch的实现原理
- 1、在AQS队列中,将线程包装为Node.SHARED节点,即标志位共享锁;
- 2、当头节点获得共享锁后,做唤醒下一个共享类型结点的操作;
- 3、头节点node1,调用
unparkSuccessor()
方法唤醒了Node2,并且调用tryAcquireShared
方法,检查下一个节点是共享节点; - 4、如果是,更改头结点,重复以上步骤,以实现节点自身获取共享锁成功后,唤醒下一个共享类型结点的操作。
2.4、什么是AQS?
1、提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架
- 使用方式是继承:
- 子类通过继承同步器并需要实现它的方法来管理其状态,管理方式是通过acquire和release方式操纵状态
- 在多线程环境中对状态的操作必须保证原子性,需要使用这个同步器提供的以下三个方法对状态进行操作
- 1、AbstractQueuedSynchronizer.getState()
- 2、AbstractQueuedSynchronizer.setState(int)
- 3、AbstractQueuedSynchronizer.compareAndSetState(int, int)
同步器是实现锁的关键
- 同步器面向的是线程访问和资源控制,他定义了线程对资源是否能够获取以及线程的排队等操作。
- 依赖于FIFO队列,队列中的node就是保存着线程引用和线程状态的容器。
- 对于一个排它锁的获取和释放
//获取: while(获取锁){ if(获取到) 退出while循环 else{ if(当前线程没有入队) 入队 阻塞当前线程 } } //释放: if(释放成功){ 删除头结点 激活原头结点的后继结点 };
- AQS详解可以参考这篇文章:JUC锁: 锁核心类AQS详解
2.5、AQS与锁(如LOCK)的对比
1、锁是面向使用者的,定义了用户调用的接口,隐藏了实现细节;
2、AQS是锁的实现者,屏蔽了同步状态管理,线程的排队,等待唤醒的底层操作;
3、锁是面向使用者,AQS是锁的具体实现者
2.6、CountDownLatch中的方法?
1、countDownLatch.await()
发生什么?
- 直接调用了AQS的
acquireSharedInterruptibly
- 当前线程就会进入了一个死循环当中,在这个死循环里面,会不断的进行判断,通过调用
tryAcquireShared
方法,如果值为0(说明共享锁没有了),会跳出循环
2、释放操作 // countDown 操作实际就是释放锁的操作,每调用一次,计数值减少1
3、限定时间的 await 方法
await(long timeout, TimeUnit unit)
异步转同步操作- spinForTimeoutThreshold 写死了1000ns,这就是所谓的自旋操作,让线程在循环中自旋,否则阻塞线程
当一个或多个线程调用await()时,调用线程会被阻塞。其它线程调用countDown()会将计数器减1 (调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,继续执行。
假设一个自习室里有7个人,其中有一个是班长,班长的主要职责就是在其它6个同学走了后,关灯,锁教室门,然后走人,因此班长是需要最后一个走的,那么有什么方法能够控制班长这个线程是最后一个执行,而其它线程是随机执行的
- 在商品中心的使用场景:组装商品的属性、类目、店铺、sku等信息,然后主线程执行await方法,异步转同步
代码:参考并发编程的艺术第8章 8.1/8.2
Demo
import java.util.concurrent.CountDownLatch; public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // 计数器 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i <= 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 上完自习,离开教室"); countDownLatch.countDown(); }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "\t 班长最后关门"); } }
输出结果:
0 上完自习,离开教室 6 上完自习,离开教室 4 上完自习,离开教室 5 上完自习,离开教室 3 上完自习,离开教室 1 上完自习,离开教室 2 上完自习,离开教室 main 班长最后关门
枚举 + CountDownLatch,程序演示秦国统一六国
import java.util.Objects; public enum CountryEnum { ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩"); private Integer retcode; private String retMessage; CountryEnum(Integer retcode, String retMessage) { this.retcode = retcode; this.retMessage = retMessage; } public static CountryEnum forEach_countryEnum(int index) { CountryEnum[] myArray = CountryEnum.values(); for(CountryEnum ce : myArray) { if (Objects.equals(index, ce.getRetcode())) { return ce; } } return null; } public Integer getRetcode() { return retcode; } public void setRetcode(Integer retcode) { this.retcode = retcode; } public String getRetMessage() { return retMessage; } public void setRetMessage(String retMessage) { this.retMessage = retMessage; } }
import java.util.concurrent.CountDownLatch; public class UnifySixCountriesDemo { public static void main(String[] args) throws InterruptedException { // 计数器 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 1; i <= 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "国被灭了!"); countDownLatch.countDown(); }, CountryEnum.forEach_countryEnum(i).getRetMessage()).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " 秦国统一中原。"); } }
输出结果:
齐国被灭了! 燕国被灭了! 楚国被灭了! 魏国被灭了! 韩国被灭了! 赵国被灭了! main 秦国统一中原。
Action1:Java多线程 有ABC 3 个线程,线程C需要等待线程AB执行完成才能执行的实现方式?(面试题)
方法一:LockSupport + AtomicInteger
- 先执行线程C,用 park() 挂起线程C,线程A、B各自执行完成时,flag 减1并判断是否为0,若为0 则用unpark( c )给线程C 颁发许可
public static void main(String[] args) { AtomicInteger flag = new AtomicInteger(2); Thread c = new Thread(()->{ System.out.println("线程C开启,等待线程A、B执行完成才继续执行"); LockSupport.park(); System.out.println("线程C开始执行"); }); c.start(); new Thread(()->{ System.out.println("线程A开始执行"); try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程A执行完成"); if (flag.decrementAndGet() == 0){ //唤醒指定线程 LockSupport.unpark(c); } }).start(); new Thread(()->{ System.out.println("线程B开始执行"); try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程B执行完成"); if (flag.decrementAndGet() == 0){ LockSupport.unpark(c); } }).start(); }
方法二:CountDownLatch
- CountDownLatch 有一个计数器,countDown方法 对计数器做减操作,await 方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时
public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(2); new Thread(() -> { System.out.println("线程A开始执行"); try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); latch.countDown(); } catch (Exception e) { e.printStackTrace(); } System.out.println("线程A执行完成"); }).start(); new Thread(() -> { System.out.println("线程B开始执行"); try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); latch.countDown(); } catch (Exception e) { e.printStackTrace(); } System.out.println("线程B执行完成"); }).start(); new Thread(() -> { System.out.println("线程C开启,等待线程A、B执行完成才继续执行"); try { latch.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("线程C执行完成"); }).start(); }
方法三:CyclicBarrier
CyclicBarrier 与 CountDownLatch 类似 ,它能阻塞一组线程全部到某个状态再同时执行。 CyclicBarrier 与 CountDownLatch 的关键区别在于,所有的线程必须全部到达位置,才能继续执行。 CountDownLatch 用于等待事件,而 CyclicBarrier 用于等待其他线程,在任意一个线程没有完成之前,所有线程都不能继续执行。
public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3); //只有所有线程执行到了 await(),所有线程才会继续往下执行 new Thread(() -> { System.out.println("线程A开始执行"); try { //执行业务 TimeUnit.SECONDS.sleep(new Random().nextInt(10)); System.out.println("线程A执行完成,等待其它线程一起冲破栅栏"); barrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("线程A执行完成"); }).start(); new Thread(() -> { System.out.println("线程B开始执行"); try { //执行业务 TimeUnit.SECONDS.sleep(new Random().nextInt(10)); System.out.println("线程B执行完成,等待其它线程一起冲破栅栏"); barrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("线程B执行完成"); }).start(); new Thread(() -> { try { System.out.println("线程C开启,等待线程AB执行完成一起冲破栅栏"); barrier.await(); //执行业务 } catch (Exception e) { e.printStackTrace(); } System.out.println("线程C执行完成"); }).start(); }
和CountDonwLatch对比
- CountDownLatch减计数,CyclicBarrier加计数。
- CountDownLatch是一次性的,CyclicBarrier可以重用。
- CountDownLatch和CyclicBarrier都有让多个线程等待同步然后再开始下一步动作的意思,但是CountDownLatch的下一步的动作实施者是主线程,具有不可重复性;而CyclicBarrier的下一步动作实施者还是“其他线程”本身,具有往复多次实施动作的特点。
Action2、使用CountDownLatch 进行异步转同步操作,每个线程退出前必须调用countDown方法,线程执行代码注意catch异常,确保 countDown 方法被执行到,避免主线程无法执行至 await 方法,直到超时才返回结果。
- 说明:注意:子线程抛出异常堆栈,不能在主线程try-catch到。
3、CountDownLatch源码分析
从源码可知,其底层是由AQS提供支持,所以其数据结构可以参考AQS的数据结构,而AQS的数据结构核心就是两个虚拟队列:同步队列sync queue 和条件队列condition queue (给互斥锁使用),不同的条件会有不同的条件队列。CountDownLatch典型的用法是将一个程序分为n个互相独立的可解决任务,并创建值为n的 CountDownLatch。当每一个任务完成时,都会在这个锁存器上调用countDown,等待问题被解决的任务调用这个锁存器的await,将他们自己拦住,直至锁存器计数结束。
3.1、类的继承关系
CountDownLatch 没有显示继承哪个父类或者实现哪个父接口,它底层是AQS是通过内部类Sync来实现的。
public class CountDownLatch {}
3.2、类的内部类
CountDownLatch类存在一个内部类Sync,继承自AbstractQueuedSynchronizer,其源代码如下。
private static final class Sync extends AbstractQueuedSynchronizer { // 版本号 private static final long serialVersionUID = 4982264981922014374L; // 构造器 Sync(int count) { setState(count); } // 返回当前计数 int getCount() { return getState(); } // 试图在共享模式下获取对象状态 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 试图设置状态来反映共享模式下的一个释放 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 无限循环 for (;;) { // 获取状态 int c = getState(); if (c == 0) // 没有被线程占有 return false; // 下一个状态 int nextc = c-1; if (compareAndSetState(c, nextc)) // 比较并且设置成功 return nextc == 0; } } }
说明:对CountDownLatch方法的调用会转发到对Sync或AQS的方法的调用,所以,AQS对CountDownLatch提供支持。
3.3、类的属性
可以看到CountDownLatch类的内部只有一个Sync类型的属性:
public class CountDownLatch { // 同步队列 private final Sync sync; }
3.4、类的构造函数
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 初始化状态数 this.sync = new Sync(count); }
说明: 该构造函数可以构造一个用给定计数初始化的 CountDownLatch,并且构造函数内完成了sync的初始化,并设置了状态数。
3.5、核心函数 - await函数
此函数将会使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。其源码如下
public void await() throws InterruptedException { // 转发到sync对象上 sync.acquireSharedInterruptibly(1); }
说明: 由源码可知,对 CountDownLatch 对象的await的调用会转发为对Sync的 acquireSharedInterruptibly(从AQS继承的方法) 方法的调用。
- acquireSharedInterruptibly源码如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
说明: 从源码中可知,acquireSharedInterruptibly 又调用了CountDownLatch的内部类Sync的 tryAcquireShared 和AQS的doAcquireSharedInterruptibly 函数。
- tryAcquireShared函数的源码如下:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
说明: 该函数只是简单的判断AQS的state是否为0,为0则返回1,不为0则返回-1。
- doAcquireSharedInterruptibly 函数的源码如下:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 添加节点至等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 无限循环 for (;;) { // 获取node的前驱节点 final Node p = node.predecessor(); if (p == head) { // 前驱节点为头节点 // 试图在共享模式下获取对象状态 int r = tryAcquireShared(arg); if (r >= 0) { // 获取成功 // 设置头节点并进行繁殖 setHeadAndPropagate(node, r); // 设置节点next域 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 在获取失败后是否需要禁止线程并且进行中断检查 // 抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
说明: 在AQS的 doAcquireSharedInterruptibly
中可能会再次调用CountDownLatch的内部类Sync的 tryAcquireShared
方法和AQS的setHeadAndPropagate
方法。
- setHeadAndPropagate 方法源码如下。
private void setHeadAndPropagate(Node node, int propagate) { // 获取头节点 Node h = head; // Record old head for check below // 设置头节点 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ // 进行判断 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取节点的后继 Node s = node.next; if (s == null || s.isShared()) // 后继为空或者为共享模式 // 以共享模式进行释放 doReleaseShared(); } }
说明: 该方法设置头节点并且释放头节点后面的满足条件的结点,该方法中可能会调用到AQS的doReleaseShared方法,其源码如下。
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ // 无限循环 for (;;) { // 保存头节点 Node h = head; if (h != null && h != tail) { // 头节点不为空并且头节点不为尾结点 // 获取头节点的等待状态 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 状态为SIGNAL if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续 continue; // loop to recheck cases // 释放后继结点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续 continue; // loop on failed CAS } if (h == head) // 若头节点改变,继续循环 break; } }
说明: 该方法在共享模式下释放,具体的流程再之后会通过一个示例给出。
所以,对CountDownLatch的await调用大致会有如下的调用链。
说明: 上图给出了可能会调用到的主要方法,并非一定会调用到,之后,会通过一个示例给出详细的分析。
3.6、核心函数 - countDown函数
此函数将递减锁存器的计数,如果计数到达零,则释放所有等待的线程
public void countDown() { sync.releaseShared(1); }
说明: 对countDown的调用转换为对Sync对象的releaseShared(从AQS继承而来)方法的调用。
- releaseShared源码如下
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
说明: 此函数会以共享模式释放对象,并且在函数中会调用到 CountDownLatch 的 tryReleaseShared 函数,并且可能会调用AQS的doReleaseShared 函数。
- tryReleaseShared 源码如下
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 无限循环 for (;;) { // 获取状态 int c = getState(); if (c == 0) // 没有被线程占有 return false; // 下一个状态 int nextc = c-1; if (compareAndSetState(c, nextc)) // 比较并且设置成功 return nextc == 0; } }
说明: 此函数会试图设置状态来反映共享模式下的一个释放。具体的流程在下面的示例中会进行分析。
- AQS的doReleaseShared的源码如下
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ // 无限循环 for (;;) { // 保存头节点 Node h = head; // 头节点不为空并且头节点不为尾结点 if (h != null && h != tail) { // 获取头节点的等待状态 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 状态为SIGNAL if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续 continue; // loop to recheck cases // 释放后继结点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续 continue; // loop on failed CAS } if (h == head) // 若头节点改变,继续循环 break; } }
说明: 此函数在共享模式下释放资源。
所以,对CountDownLatch的countDown调用大致会有如下的调用链。
说明: 上图给出了可能会调用到的主要方法,并非一定会调用到,之后,会通过一个示例给出详细的分析。
4、CountDownLatch示例
下面给出了一个使用CountDownLatch的示例。
import java.util.concurrent.CountDownLatch; class MyThread extends Thread { private CountDownLatch countDownLatch; public MyThread(String name, CountDownLatch countDownLatch) { super(name); this.countDownLatch = countDownLatch; } public void run() { System.out.println(Thread.currentThread().getName() + " doing something"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " finish"); countDownLatch.countDown(); } } public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(2); MyThread t1 = new MyThread("t1", countDownLatch); MyThread t2 = new MyThread("t2", countDownLatch); t1.start(); t2.start(); System.out.println("Waiting for t1 thread and t2 thread to finish"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " continue"); } }
运行结果(某一次):
Waiting for t1 thread and t2 thread to finish t1 doing something t2 doing something t1 finish t2 finish main continue
说明: 本程序首先计数器初始化为2。根据结果,可能会存在如下的一种时序图。
说明: 首先main线程会调用await操作,此时main线程会被阻塞,等待被唤醒,之后t1线程执行了countDown操作,最后,t2线程执行了countDown操作,此时main线程就被唤醒了,可以继续运行。下面,进行详细分析。
- main线程执行 countDownLatch.await 操作,主要调用的函数如下。
说明: 在最后,main线程就被park了,即禁止运行了。此时Sync queue(同步队列)中有两个节点,AQS的state为2,包含main线程的结点的 nextWaiter 指向SHARED结点。
- t1线程执行 countDownLatch.countDown操作,主要调用的函数如下。
说明: 此时,Sync queue队列里的结点个数未发生变化,但是此时,AQS的state已经变为1了。
- t2线程执行 countDownLatch.countDown 操作,主要调用的函数如下。
说明: 经过调用后,AQS的state为0,并且此时,main线程会被unpark,可以继续运行。当main线程获取cpu资源后,继续运行。
- main线程获取cpu资源,继续运行,由于main线程是在 parkAndCheckInterrupt 函数中被禁止的,所以此时,继续在parkAndCheckInterrupt 函数运行。
说明: main线程恢复,继续在parkAndCheckInterrupt
函数中运行,之后又会回到最终达到的状态为AQS的state为0,并且head与tail指向同一个结点,该节点的nextWaiter域还是指向SHARED结点。
5、更深入理解
5.1、写道面试题
实现一个容器,提供两个方法,add,size 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束.
5.2、使用wait和notify实现
import java.util.ArrayList; import java.util.List; /** * 必须先让t2先进行启动 使用 wait 和 notify 进行相互通讯,wait会释放锁,notify不会释放锁 */ public class T2 { volatile List list = new ArrayList(); public void add (int i){ list.add(i); } public int getSize(){ return list.size(); } public static void main(String[] args) { T2 t2 = new T2(); Object lock = new Object(); new Thread(() -> { synchronized(lock){ System.out.println("t2 启动"); if(t2.getSize() != 5){ try { /**会释放锁*/ lock.wait(); System.out.println("t2 结束"); } catch (InterruptedException e) { e.printStackTrace(); } } lock.notify(); } },"t2").start(); new Thread(() -> { synchronized (lock){ System.out.println("t1 启动"); for (int i=0;i<9;i++){ t2.add(i); System.out.println("add"+i); if(t2.getSize() == 5){ /**不会释放锁*/ lock.notify(); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); } }
输出:
t2 启动 t1 启动 add0 add1 add2 add3 add4 t2 结束 add5 add6 add7 add8
5.3、CountDownLatch实现
说出使用CountDownLatch 代替wait notify 好处?
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 使用CountDownLatch 代替wait notify 好处是通讯方式简单,不涉及锁定 Count 值为0时当前线程继续执行, */ public class T3 { volatile List list = new ArrayList(); public void add(int i){ list.add(i); } public int getSize(){ return list.size(); } public static void main(String[] args) { T3 t = new T3(); CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(() -> { System.out.println("t2 start"); if(t.getSize() != 5){ try { countDownLatch.await(); System.out.println("t2 end"); } catch (InterruptedException e) { e.printStackTrace(); } } },"t2").start(); new Thread(()->{ System.out.println("t1 start"); for (int i = 0;i<9;i++){ t.add(i); System.out.println("add"+ i); if(t.getSize() == 5){ System.out.println("countdown is open"); countDownLatch.countDown(); } } System.out.println("t1 end"); },"t1").start(); } }
5.4、CountDownLatch在项目中的使用
// 在审核业务中的使用
try { if(CollectionUtils.isEmpty(needAuditApply)){ return; } CountDownLatch countDownLatch = new CountDownLatch(needAuditApply.size()); for (ApplyContext applyContext : needAuditApply){ BatchApplyAuditThreadPool.getInstance().execute(() -> { try { Response<ApplyResult> applyResult = apply(applyContext); responseMap.put(applyContext.getItem().getId(), applyResult); } catch (Exception e){ responseMap.put(applyContext.getItem().getId(), Response.fail("提交审核发生异常")); log.error("多线程调用工作流提交审核流程异常, applyContext:{}", ImJsonUtils.objToJson(applyContext), e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(15, TimeUnit.SECONDS); } catch (InterruptedException e){ log.error("多线程调用工作流提交审核流程线程发生中断异常,applyContext:{}", ImJsonUtils.objToJson(needAuditApply), e); }
5.5、CountDownLatch在中间件中的使用
场景1:消息中间RocketMQ
- BrokerOuterAPI @registerBrokerAll()
// 使用 CountDownLatch 等到向所有 namesrv 注册完毕,才会放行 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { }
场景2:分布式协调器ZK
- ZK实现分布式锁,可以参考这篇文章
- todo