CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(下)

简介: CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(下)

CyclicBarrier


上面简单说了一下 CyclicBarrier 被创造出来的理由,这里先看一下它的字面解释:


微信图片_20220511110850.png

概念总是有些抽象,我们将上面的例子用 CyclicBarrier 再做个改动,先让大家有个直观的使用概念


@Slf4j
public class CyclicBarrierExample {
   // 创建 CyclicBarrier 实例,计数器的值设置为2
   private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
   public static void main(String[] args) {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      int breakCount = 0;
         // 将线程提交到线程池
      executorService.submit(() -> {
         try {
            log.info(Thread.currentThread() + "第一回合");
            Thread.sleep(1000);
            cyclicBarrier.await();
            log.info(Thread.currentThread() + "第二回合");
            Thread.sleep(2000);
            cyclicBarrier.await();
            log.info(Thread.currentThread() + "第三回合");
         } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
         } 
      });
      executorService.submit(() -> {
         try {
            log.info(Thread.currentThread() + "第一回合");
            Thread.sleep(2000);
            cyclicBarrier.await();
            log.info(Thread.currentThread() + "第二回合");
            Thread.sleep(1000);
            cyclicBarrier.await();
            log.info(Thread.currentThread() + "第三回合");
         } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
         }
      });
      executorService.shutdown();
   }
}


运行结果:


微信图片_20220511110935.png


结合程序代码与运行结果,我们可以看出,子线程执行完第一回合后(执行回合所需时间不同),都会调用 await() 方法,等所有线程都到达屏障点后,会突破屏障继而执行第二回合,同样的道理最终到达第三回合


形象化的展示上述示例的运行过程


微信图片_20220511111001.png


看到这里,你应该明白 CyclicBarrier 的基本用法,但随之你内心也应该有了一些疑问:


  1. 怎么判断所有线程都到达屏障点的?


  1. 突破某一屏障后,又是怎么重置 CyclicBarrier 计数器,等待线程再一次突破屏障呢?


带着这些问题我们来看一看源码


源码分析


同样先打开 CyclicBarrier 的类结构,展开类全部内容,其实也没多少内容


微信图片_20220511111041.png


从类结构中看到有:


  1. await() 方法,猜测应该和 CountDownLatch 是类似的,都是获取同步状态,阻塞自己


  1. ReentrantLock,CyclicBarrier 内部竟然也用到了我们之前讲过的 ReentrantLock,猜测这个锁一定保护 CyclicBarrier 的某个变量,那肯定也是基于 AQS 相关知识了


  1. Condition,存在条件,猜测会有等待/通知机制的运用


我们继续带着这些猜测,结合上面的实例代码一点点来验证


// 创建 CyclicBarrier 实例,计数器的值设置为2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);


查看构造函数 (这里的英文注释舍不得删掉,因为说的太清楚了,我来结合注释来说明一下):


private final int parties;
private int count;
public CyclicBarrier(int parties) {
    this(parties, null);
}
    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }


根据注释说明,parties 代表冲破屏障之前要触发的线程总数,count 本身又是计数器,那问题来了


直接就用 count 不就可以了嘛?为啥同样用于初始化计数器,要维护两个变量呢?


从 parties 和 count 的变量声明中,你也能看出一些门道,前者有 final 修饰,初始化后就不可以改变了,因为 CyclicBarrier 的设计目的是可以循环利用的,所以始终用 parties 来记录线程总数,当 count 计数器变为 0 后,如果没有 parties 的值赋给它,怎么进行重新复用再次计数呢,所以这里维护两个变量很有必要


接下来就看看 await() 到底是怎么实现的


// 从方法签名上可以看出,该方法同样可以被中断,另外还有一个 BrokenBarrierException 异常,我们一会看
public int await() throws InterruptedException, BrokenBarrierException {
    try {
          // 调用内部 dowait 方法, 第一个参数为 false,表示不设置超时时间,第二个参数也就没了意义
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}


接下来看看 dowait(false, 0L) 做了哪些事情 (这个方法内容有点多,别担心,逻辑并不复杂,请看关键代码注释)


private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 还记得之前说过的 Lock 标准范式吗? JDK 内部都是这么使用的,你一定也要遵循范式
    lock.lock();
    try {
        final Generation g = generation;
          // broken 是静态内部类 Generation唯一的一个成员变量,用于记录当前屏障是否被打破,如果打破,则抛出 BrokenBarrierException 异常
          // 这里感觉挺困惑的,我们要【冲破】屏障,这里【打破】屏障却抛出异常,注意我这里的用词
        if (g.broken)
            throw new BrokenBarrierException();
          // 如果线程被中断,则会通过 breakBarrier 方法将 broken 设置为true,也就是说,如果有线程收到中断通知,直接就打破屏障,停止 CyclicBarrier, 并唤醒所有线程
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
          // ************************************
          // 因为 breakBarrier 方法在这里会被调用多次,为了便于大家理解,我直接将 breakBarrier 代码插入到这里
          private void breakBarrier() {
          // 将打破屏障标识 设置为 true
          generation.broken = true;
          // 重置计数器
          count = parties;
          // 唤醒所有等待的线程
          trip.signalAll();
            }
          // ************************************
                // 每当一个线程调用 await 方法,计数器 count 就会减1
        int index = --count;
          // 当 count 值减到 0 时,说明这是最后一个调用 await() 的子线程,则会突破屏障
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                  // 获取构造函数中的 barrierCommand,如果有值,则运行该方法
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                  // 激活其他因调用 await 方法而被阻塞的线程,并重置 CyclicBarrier
                nextGeneration();
                // ************************************
                // 为了便于大家理解,我直接将 nextGeneration 实现插入到这里
                private void nextGeneration() {
                    // signal completion of last generation
                    trip.signalAll();
                    // set up next generation
                    count = parties;
                    generation = new Generation();
                }
                // ************************************
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
          // index 不等于0, 说明当前不是最后一个线程调用 await 方法
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                  // 没有设置超时时间
                if (!timed)
                      // 进入条件等待
                    trip.await();
                else if (nanos > 0L)
                      // 否则,判断超时时间,这个我们在 AQS 中有说明过,包括为什么最后超时阈值 spinForTimeoutThreshold 不再比较的原因,大家会看就好
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                  // 条件等待被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
              // 如果新一轮回环结束,会通过 nextGeneration 方法新建 generation 对象
            if (g != generation)
                return index;
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}


doWait 就是 CyclicBarrier 的核心逻辑, 可以看出,该方法入口使用了 ReentrantLock,这也就是为什么 Generation broken 变量没有被声明为 volatile 类型保持可见性,因为对其的更改都是在锁的内部,同样在锁的内部对计数器 count 做更新,也保证了原子性


doWait 方法中,是通过 nextGeneration 方法来重新初始化/重置 CyclicBarrier 状态的,该类中还有一个 reset() 方法,也是重置 CyclicBarrier 状态的


public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}


但 reset() 方法并没有在 CyclicBarrier 内部被调用,显然是给 CyclicBarrier 使用者来调用的,那问题来了


什么时候调用 reset() 方法呢


正常情况下,CyclicBarrier 是会被自动重置状态的,从 reset 的方法实现中可以看出调用了 breakBarrier

方法,也就是说,调用 reset 会使当前处在等待中的线程最终抛出 BrokenBarrierException 并立即被唤醒,所以说 reset() 只会在你想打破屏障时才会使用


微信图片_20220511111336.png


上述示例,我们构建 CyclicBarrier 对象时,并没有传递 barrierCommand 对象, 我们修改示例传入一个 barrierCommand 对象,看看会有什么结果:


// 创建 CyclicBarrier 实例,计数器的值设置为2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
   log.info("全部运行结束");
});


运行结果:


微信图片_20220511111426.png


从运行结果中来看,每次冲破屏障后都会执行 CyclicBarrier 初始化 barrierCommand 的方法, 这与我们对 doWait() 方法的分析完全吻合,从上面的运行结果中可以看出,最后一个线程是运行 barrierCommand run() 方法的线程,我们再来形象化的展示一下整个过程


微信图片_20220511111534.png


从上图可以看出,barrierAction 与每次突破屏障是串行化的执行过程,假如 barrierAction 是很耗时的汇总操作,那这就是可以优化的点了,我们继续修改代码


// 创建单线程线程池
private static Executor executor = Executors.newSingleThreadExecutor();
// 创建 CyclicBarrier 实例,计数器的值设置为2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
   executor.execute(() -> gather());
});
private static void gather() {
   try {
      Thread.sleep(2000);
   } catch (InterruptedException e) {
      e.printStackTrace();
   }
   log.info("全部运行结束");
}


我们这里将 CyclicBarrier 的回调函数 barrierAction使用单线程的线程池,这样最后一个冲破屏障的线程就不用等待 barrierAction 的执行,直接分配个线程池里的线程异步执行,进一步提升效率


运行结果如下:


微信图片_20220511111733.png


我们再形象化的看一下整个过程:


微信图片_20220511112230.png


这里使用了单一线程池,增加了并行操作,提高了程序运行效率,那问题来了:


如果 barrierAction 非常非常耗时,冲破屏障的任务就可能堆积在单一线程池的等待队列中,就存在 OOM 的风险,那怎么办呢?


这是就要需要一定的限流策略或者使用线程池的拒绝的略等


那把单一线程池换成非单一的固定线程池不就可以了嘛?比如 fixed(5)


乍一看确实能缓解单线程池可能引起的任务堆积问题,上面代码我们看到的 gather() 方法,假如该方法内部没有使用锁或者说存在竟态条件,那 CyclicBarrier 的回调函数 barrierAction 使用多线程必定引起结果的不准确


所以在实际使用中还要结合具体的业务场景不断优化代码,使之更加健壮


总结


本文讲解了 CountDownLatch 和 CyclicBarrier 的经典使用场景以及实现原理,以及在使用过程中可能会遇到的问题,比如将大的 list 拆分作业就可以用到前者,读取多个 Excel 的sheet 页,最后进行结果汇总就可以用到后者 (文中完整示例代码已上传)


最后,再形象化的比喻一下


  • CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有游客到齐才能去下一个景点


  • 而 CyclicBarrier 是一组线程之间的相互等待,可以类比几个驴友之间的不离不弃,共同到达某个地方,再继续出发,这样反复


灵魂追问


  1. 怎样拿到 CyclicBarrier 的汇总结果呢?


  1. 线程池中的 Future 特性你有使用过吗?


接下来,咱们就聊聊那些可以使用的 Future 特性


相关文章
|
Java 测试技术 Maven
看到一个魔改线程池,面试素材加一!(中)
看到一个魔改线程池,面试素材加一!(中)
406 0
看到一个魔改线程池,面试素材加一!(中)
|
4月前
|
Java
java多线程售票例子
java多线程售票例子
1.5w字,30图带你彻底掌握 AQS!(建议收藏)
AQS( AbstractQueuedSynchronizer )是一个用来构建锁和同步器(所谓同步,是指线程之间的通信、协作)的框架,Lock 包中的各种锁(如常见的 ReentrantLock, ReadWriteLock), concurrent它包中的各种同步器(如 CountDownLatch, Semaphore, CyclicBarrier)都是基于 AQS 来构建,所以理解 AQS 的实现原理至关重要,AQS 也是面试中区分候选人的常见考点,我们务必要掌握,本文将用循序渐进地介绍 AQS,相信大家看完一定有收获。文章目录如下
|
4月前
|
安全
带你手搓阻塞队列——自定义实现
带你手搓阻塞队列——自定义实现
60 0
|
Java 调度 Android开发
七千字带你深入JUC线程基础
七千字带你深入JUC线程基础
130 0
七千字带你深入JUC线程基础
|
Java
这篇 ReentrantLock 看不懂,加我我给你发红包(三)
在开始本篇文章的内容讲述前,先来回答我一个问题,为什么 JDK 提供一个 synchronized 关键字之后还要提供一个 Lock 锁,这不是多此一举吗?难道 JDK 设计人员都是沙雕吗?
126 1
这篇 ReentrantLock 看不懂,加我我给你发红包(三)
|
Java 调度
这篇 ReentrantLock 看不懂,加我我给你发红包(二)
在开始本篇文章的内容讲述前,先来回答我一个问题,为什么 JDK 提供一个 synchronized 关键字之后还要提供一个 Lock 锁,这不是多此一举吗?难道 JDK 设计人员都是沙雕吗?
89 0
这篇 ReentrantLock 看不懂,加我我给你发红包(二)
|
Java 调度
这篇 ReentrantLock 看不懂,加我我给你发红包(一)
在开始本篇文章的内容讲述前,先来回答我一个问题,为什么 JDK 提供一个 synchronized 关键字之后还要提供一个 Lock 锁,这不是多此一举吗?难道 JDK 设计人员都是沙雕吗?
79 0
这篇 ReentrantLock 看不懂,加我我给你发红包(一)
|
Oracle Java 关系型数据库
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(上)
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(上)
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(上)