CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(上)

简介: CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(上)

微信图片_20220511105331.jpg


  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough


微信图片_20220511105400.png


前言


并发编程的三大核心是分工同步互斥。在日常开发中,经常会碰到需要在主线程中开启多个子线程去并行的执行任务,并且主线程需要等待所有子线程执行完毕再进行汇总的场景,这就涉及到分工与同步的内容了


在讲 有序性可见性,Happens-before来搞定 时,提到过 join() 规则,使用 join() 就可以简单的实现上述场景:


@Slf4j
public class JoinExample {
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-1 执行完毕");
            }
        }, "Thread-1");
        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-2 执行完毕");
            }
        }, "Thread-2");
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        log.info("主线程执行完毕");
    }
}


运行结果:


微信图片_20220511105503.png


整个过程可以这么理解


微信图片_20220511105539.png


我们来查看 join() 的实现源码:


微信图片_20220511105559.png


其实现原理是不停的检查 join 线程是否存活,如果 join 线程存活,则 wait(0) 永远的等下去,直至 join 线程终止后,线程的 this.notifyAll() 方法会被调用(该方法是在 JVM 中实现的,JDK 中并不会看到源码),退出循环恢复主线程执行。很显然这种循环检查的方式比较低效


除此之外,使用 join() 缺少很多灵活性,比如实际项目中很少让自己单独创建线程(原因在 我会手动创建线程,为什么要使用线程池? 中说过)而是使用 Executor, 这进一步减少了 join() 的使用场景,所以 join() 的使用在多数是停留在 demo 演示上


那如何实现文中开头提到的场景呢?


CountDownLatch


CountDownLatch, 直译过来【数量向下门闩】,那肯定里面有计数器的存在了。我们将上述程序用 CountDownLatch 实现一下,先让大家有个直观印象


@Slf4j
public class CountDownLatchExample {
    private static CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        // 这里不推荐这样创建线程池,最好通过 ThreadPoolExecutor 手动创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-1 执行完毕");
                //计数器减1
                countDownLatch.countDown();
            }
        });
        executorService.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-2 执行完毕");
                //计数器减1
                countDownLatch.countDown();
            }
        });
        log.info("主线程等待子线程执行完毕");
        log.info("计数器值为:" + countDownLatch.getCount());
        countDownLatch.await();
        log.info("计数器值为:" + countDownLatch.getCount());
        log.info("主线程执行完毕");
        executorService.shutdown();
    }
}


运行结果如下:


微信图片_20220511105717.png


结合上述示例的运行结果,相信你也能猜出 CountDownLatch 的实现原理了:


  1. 初始化计数器数值,比如为2


  1. 子线程执行完则调用 countDownLatch.countDown() 方法将计数器数值减1


  1. 主线程调用 await() 方法阻塞自己,直至计数器数值为0(即子线程全部执行结束)


不知道你是否注意, countDownLatch.countDown(); 这行代码可以写在子线程执行的任意位置,不像 join() 要完全等待子线程执行完,这也是 CountDownLatch 灵活性的一种体现


上述的例子还是过于简单,Oracle 官网 CountDownLatch 说明 有两个非常经典的使用场景,示例很简单,强烈建议查看相关示例代码,打开使用思路。我将两个示例代码以图片的形式展示在此处:


官网示例1


  • 第一个是开始信号 startSignal,阻止任何工人 Worker 继续工作,直到司机 Driver 准备好让他们继续工作


  • 第二个是完成信号 doneSignal,允许司机 Driver 等待,直到所有的工人 Worker 完成。


微信图片_20220511105803.png


官网示例2


另一种典型的用法是将一个问题分成 N 个部分 (比如将一个大的 list 拆分成多分,每个 Worker 干一部分),Worker 执行完自己所处理的部分后,计数器减1,当所有子部分完成后,Driver 才继续向下执行


微信图片_20220511105920.png


结合官网示例,相信你已经可以结合你自己的业务场景解,通过 CountDownLatch 解决一些串行瓶颈来提高运行效率了,会用还远远不够,咱得知道 CountDownLatch 的实现原理


源码分析


CountDownLatch 是 AQS 实现中的最后一个内容,有了前序文章的知识铺垫:




当你看到 CountDownLatch 的源码内容,你会高兴的笑起来,内容真是太少了


微信图片_20220511105953.png


展开类结构全部内容就这点东西


微信图片_20220511110020.png


既然 CountDownLatch 是基于 AQS 实现的,那肯定也离不开对同步状态变量 state 的操作,我们在初始化的时候就将计数器的值赋值给了state


微信图片_20220511110106.png


另外,它可以多个线程同时获取,那一定是基于共享式获取同步变量的用法了,所以它需要通过重写下面两个方法控制同步状态变量 state :


  • tryAcquireShared()


  • tryReleaseShared()


CountDownLatch 暴露给使用者的只有 await()countDown() 两个方法,前者是阻塞自己,因为只有获取同步状态才会才会出现阻塞的情况,那自然是在 await() 的方法内部会用到 tryAcquireShared();有获取就要有释放,那后者 countDown() 方法内部自然是要用到 tryReleaseShared() 方法了


PS:如果你对上面这个很自然的推断理解有困难,强烈建议你看一下前序文章的铺垫,以防止知识断层带来的困扰


await()


先来看 await() 方法, 从方法签名上看,该方法会抛出 InterruptedException, 所以它是可以响应中断的,这个我们在 Java多线程中断机制 中明确说明过


public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}


其内部调用了同步器提供的模版方法 acquireSharedInterruptibly


public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
      // 如果监测到中断标识为true,会重置标识,然后抛出 InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
      // 调用重写的 tryAcquireShared 方法,该方法结果如果大于零则直接返回,程序继续向下执行,如果小于零,则会阻塞自己
    if (tryAcquireShared(arg) < 0)
          // state不等于0,则尝试阻塞自己
        doAcquireSharedInterruptibly(arg);
}


重写的 tryAcquireShared 方法非常简单, 就是判断同步状态变量 state 的值是否为 0, 如果为零 (子线程已经全部执行完毕)则返回1, 否则返回 -1


protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}


如果子线程没有全部执行完毕,则会通过 doAcquireSharedInterruptibly 方法阻塞自己,这个方法在 Java AQS共享式获取同步状态及Semaphore的应用分析 中已经仔细分析过了,这里就不再赘述了


private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                  // 再次尝试获取同步装阿嚏,如果大于0,说明子线程全部执行完毕,直接返回
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
              // 阻塞自己
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


await() 方法的实现就是这么简单,接下来看看 countDown() 的实现原理


countDown()


public void countDown() {
    sync.releaseShared(1);
}


同样是调用同步器提供的模版方法 releaseShared


public final boolean releaseShared(int arg) {
      // 调用自己重写的同步器方法
    if (tryReleaseShared(arg)) {
          // 唤醒调用 await() 被阻塞的线程
        doReleaseShared();
        return true;
    }
    return false;
}


重写的 tryReleaseShared 同样很简单


protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
          // 如果当前状态值为0,则直接返回 (1)
        if (c == 0)
            return false;
          // 使用 CAS 让计数器的值减1 (2)
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

代码 (1) 判断当前同步状态值,如果为0 则直接返回 false;否则执行代码 (2),使用 CAS 将计数器减1,如果 CAS 失败,则循环重试,最终返回 nextc == 0 的结果值,如果该值返回 true,说明最后一个线程已调用 countDown() 方法,然后就要唤醒调用 await() 方法被阻塞的线程,同样由于分析过 AQS 的模版方法 doReleaseShared 整个释放同步状态以及唤醒的过程,所以这里同样不再赘述了


仔细看CountDownLatch重写的 tryReleaseShared 方法,有一点需要和大家说明:


代码 (1) if (c == 0) 看似没什么用处,其实用处大大滴,如果没有这个判断,当计数器值已经为零了,其他线程再调用 countDown 方法会将计数器值变为负值


现在就差 await(long timeout, TimeUnit unit) 方法没介绍了


await(long timeout, TimeUnit unit)


public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}


该方法签名同样抛出 InterruptedException,意思可响应中断。它其实就是 await() 更完善的一个版本,简单来说就是


主线程设定等待超时时间,如果该时间内子线程没有执行完毕,主线程也会 直接返回


我们将上面的例子稍稍修改一下你就会明白(主线程超时时间设置为 2 秒,而子线程要 sleep 5 秒)


@Slf4j
public class CountDownLatchTimeoutExample {
   private static CountDownLatch countDownLatch = new CountDownLatch(2);
   public static void main(String[] args) throws InterruptedException {
      // 这里不推荐这样创建线程池,最好通过 ThreadPoolExecutor 手动创建线程池
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      executorService.submit(() -> {
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            log.info("Thread-1 执行完毕");
            //计数器减1
            countDownLatch.countDown();
         }
      });
      executorService.submit(() -> {
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            log.info("Thread-2 执行完毕");
            //计数器减1
            countDownLatch.countDown();
         }
      });
      log.info("主线程等待子线程执行完毕");
      log.info("计数器值为:" + countDownLatch.getCount());
      countDownLatch.await(2, TimeUnit.SECONDS);
      log.info("计数器值为:" + countDownLatch.getCount());
      log.info("主线程执行完毕");
      executorService.shutdown();
   }
}


运行结果如下:


微信图片_20220511110519.png


形象化的展示上述示例的运行过程


微信图片_20220511110543.png


小结


CountDownLatch 的实现原理就是这么简单,了解了整个实现过程后,你也许发现了使用 CountDownLatch 的一个问题:


计数器减 1 操作是 一次性的,也就是说当计数器减到 0, 再有线程调用 await() 方法,该线程会直接通过, 不会再起到等待其他线程执行结果起到同步的作用了


为了解决这个问题,贴心的 Doug Lea 大师早已给我们准备好相应策略 CyclicBarrier


微信图片_20220511110612.png


本来想将 CyclicBarrier 的内容放到下一个章节,但是 CountDownLatch 的内容着实有些少,不够解渴,另外有对比才有伤害,所以内容没结束,咱得继续看 CyclicBarrier


微信图片_20220511110638.png








目录
打赏
0
0
0
0
1
分享
相关文章
看到一个魔改线程池,面试素材加一!(中)
看到一个魔改线程池,面试素材加一!(中)
484 0
看到一个魔改线程池,面试素材加一!(中)
程序员教你用代码制作3d爱心跳动特效,正好拿去送给女神给她个惊喜
使用HTML、CSS和JavaScript实现了一个三维网格采样器`MeshSurfaceSampler`,适用于任意浏览器,推荐谷歌。代码创建了一个类,从缓冲几何体的三角形网格中进行随机采样。提供了设置权重属性、构建分布和自定义随机数生成器的功能。用户只需将代码复制到文本文档并保存为HTML文件,即可运行。适合编程爱好者尝试,也可分享给他人。
262 1
|
8月前
|
带你手搓阻塞队列——自定义实现
带你手搓阻塞队列——自定义实现
99 0
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
求求你不要写满屏的 try...catch 了,这才是优雅的处理方式,真香...
求求你不要写满屏的 try...catch 了,这才是优雅的处理方式,真香...
284 0
求求你不要写满屏的 try...catch 了,这才是优雅的处理方式,真香...
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(下)
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(下)
CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了(下)
别走!这里有个笔记:图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)(一)
AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。
118 0
看到一个魔改线程池,面试素材加一!(下)
看到一个魔改线程池,面试素材加一!(下)
160 0
看到一个魔改线程池,面试素材加一!(下)
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等