Java并发编程笔记之CyclicBarrier源码分析

简介: JUC 中 回环屏障 CyclicBarrier 的使用与分析,它也可以实现像 CountDownLatch 一样让一组线程全部到达一个状态后再全部同时执行,但是 CyclicBarrier 可以被复用。

JUC 中 回环屏障 CyclicBarrier 的使用与分析,它也可以实现像 CountDownLatch 一样让一组线程全部到达一个状态后再全部同时执行,但是 CyclicBarrier 可以被复用。那么 CyclicBarrier 内部的实现与 CountDownLatch 有何不同那?

  CounDownLatch在解决多个线程同步方面相对于调用线程的 join 已经提供了不少改进,但是CountDownLatch的计数器是一次性的,也就是等到计数器变为0后,再调用CountDownLatch的await ()和countDown()方法都会立刻返回,这就起不到线程同步的效果了。CyclicBarrier类的功能不限于CountDownLatch所提供的功能,从字面意思理解CyclicBarrier是回环屏障的意思,它可以实现让一组线程全部达到一个状态后再全部同时执行。这里之所以叫做回环是因为当所有等待线程执行完毕之后,重置CyclicBarrier的状态后可以被重用。下图演示了这一过程。

一.CyclicBarrier的实现原理

  为了能一览CyclicBarrier的架构设计,下面先看下CyclicBarrier的类图,如下图:

如上面类图,可以知道CyclicBarrier 内部并不是直接使用AQS实现,而是使用了独占锁ReentrantLock来实现的同步;parties用来记录线程个数,用来表示需要多少线程先调用await后,所有线程才会冲破屏障继续往下运行;而 count 一开始等一parties,每当线程调用await方法后就递减 1 ,当为 0 的时候就表示所有线程都到了屏障点,另外你可能会疑惑为何维护parties 和 count 这两个变量,只有count 不就行了吗?别忘了CyclicBarries是可以被复用的,使用两个变量原因是用parties始终来记录总的线程个数,当count计数器变为 0 后,会使用parties 赋值给count,已达到复用的作用。这两个变量是在构造CyclicBarries对象的时候传递的,源码如下:

这里还有一个变量barrierConmmand也通过构造函数传递而来,这是一个任务,这个任务的执行时机是当所有线程都达到屏障点后。另外CyclicBarrier内部使用独占锁Lock来保证同时只有一个线程调用await方法时候才可以返回,使用lock首先保证了更新计数器count 的原子性,另外使用lock的条件变量 trip 支持了 线程间使用 notify,await 操作进行同步。

最后变量generation内部就一个变量broken用来记录当前屏障是否被打破,另外注意这里broken并没有被声明为volatile ,这是因为锁内使用变量不需要。源码如下:


 private static class Generation {
        boolean broken = false;
 }


 

接下来重点看一下CyclicBarrier的几个重要的函数,如下:

  1.int await() 当前线程调用 CyclicBarrier 的该方法时候,当前线程会被阻塞,知道满足下面条件之一才会返回:(1)parties 个线程都调用了 await()方法,也就是线程都到了屏障点。(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException 异常返回。(3)当前屏障点关联的Generation对象的broken标志被设置为true的时候,会抛出 BrokenBarrierException 异常。源码如下:


public int await() throws InterruptedException, BrokenBarrierException {
   try {
       return dowait(false, 0L);
   } catch (TimeoutException toe) {
       throw new Error(toe); // cannot happen
   }
}


正如上面代码可以知道内部调用了dowait 方法,第一个参数false说明不设置超时时间,这时候第二个参数没有意义。

 

  2.boolean await(long timeout, TimeUnit unit) 当前线程调用 CyclicBarrier 的该方法时候当前线程会被阻塞,直到满足下面条件之一才会返回: (1) parties 个线程都调用了 await() 函数,也就是线程都到了屏障点,这时候返回 true。 (2) 当设置的超时时间到了后返回 false (3) 其它线程调用了当前线程的 interrupt()方法中断了当前线程,则当前线程会抛出 InterruptedException 异常返回。 (4) 当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时候,会抛出 BrokenBarrierException 异常。源码如下:


public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
}


正如上面代码可以知道内部调用了dowait 方法,第一个参数true说明设置超时时间,这时候第二个参数是超时时间。

 

  3.int dowait(boolean timed, long nanos) 该方法是实现 CyclicBarrier 的核心功能,源码如下:


private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
             ...

            //(1)如果index==0说明所有线程都到到了屏障点,则执行初始化时候传递的任务
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //(2)执行任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //(3)激活其它调用await而被阻塞的线程,并重置CyclicBarrier
                    nextGeneration();
                    //返回
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // (4)如果index!=0
            for (;;) {
                try {
                     //(5)没有设置超时时间,
                     if (!timed)
                        trip.await();
                    //(6)设置了超时时间
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    ...
                }
                    ...
            }
        } finally {
            lock.unlock();
        }
  }

   private void nextGeneration() {
       //(7)唤醒条件队列里面阻塞线程
       trip.signalAll();
       //(8) 重置CyclicBarrier
       count = parties;
       generation = new Generation();
    }


上面代码是dowait方法的主干代码,当一个线程调用了dowait方法后首先会获取独占锁lock,如果创建CyclicBarrier的时候传递的参数为 10 ,那么后面 9 个调用线程会被阻塞;然后当前获取线程对计数器count进行递减操作,递减后的count = index = 9 ,因为 index != 0 ,所以当前线程会执行代码(4)。如果是无参数的当前线程调用的是无参数的await()方法,则这里 timed = false,所以当前线程会被放入条件变量trip的阻塞队列,当前线程会被挂起并释放获取的Lock锁;如果调用的有参数的await 方法 则timed = true,则当前线程也会被放入条件变量阻塞队列并释放锁的资源,但是不同的是当前线程会在指定时间超时后自动激活。

当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的 9 个线程中有一个会竞争到lock锁,然后执行第一个线程同样的操作,直到最后一个线程获取到lock的时候,已经有 9 个线程被放入了Lock 的条件队列里面,最后一个线程 count 递减后,count = index 等于 0 ,所以执行代码(2),如果创建CyclicBarrier的时候传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他 9 个线程,并重置CyclicBarrier,然后这 10个线程就可以继续向下执行了。

 

到目前位置理解了CyclicBarrier的原理后,接下来用几个例子来加深对CyclicBarrier的理解,下面例子我们要实现的是使用两个线程去执行一个被分解的任务 A,当两个线程把自己的任务都执行完毕后在对它们的结果进行汇总处理。例子如下:


package com.hjc;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by cong on 2018/7/7.
 */
public class CyclicBarrierTest1 {

    // 创建一个CyclicBarrier实例,添加一个所有子线程全部到达屏障后执行的一个任务
    private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread() + " task1 merge result");
        }
    });

    public static void main(String[] args) throws InterruptedException {

        //创建一个线程个数固定为2的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() + " task1-1");

                    System.out.println(Thread.currentThread() + " enter in barrier");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + " enter out barrier");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + " task1-2");

                    System.out.println(Thread.currentThread() + " enter in barrier");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + " enter out barrier");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 关闭线程池
        executorService.shutdown();
    }
}


运行结果如下:

如上代码创建了一个 CyclicBarrier 对象,第一个参数为计数器初始值,第二个参数 Runable 是指当计数器为 0 时候需要执行的任务。main 函数里面首先创建了固定大小为 2 的线程池,然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用 await 方法。

一开始计数器为 2,当第一个线程调用 await 方法时候,计数器会递减为 1,由于计数器不为 0,所以当前线程就到了屏障点会被阻塞,然后第二个线程调用 await 时候,会进入屏障,计数器也会递减现在计数器为 0,就会去执行在 CyclicBarrier 构造时候的任务,执行完毕后就会退出屏障点,并且会唤醒被阻塞的第一个线程,这时候第一个线程也会退出屏障点继续向下运行。

上面的例子说明了多个线程之间是相互等待的,假如计数器为 N,那么调用 await 方法的前面 N-1 的线程都会因为到达屏障点被阻塞,当第 N 个线程调用 await 后,计数器为 0 了,这时候第 N 个线程才会发出通知唤醒前面的 N-1 个线程。也就是全部线程达到屏障点时候才能一块继续向下执行,对与这个例子来说使用 CountDownLatch 也可以达到类似输出结果。

 

下面在放个例子来说明 CyclicBarrier 的可复用性。

假设一个任务由阶段 1、阶段 2、阶段 3 组成,每个线程要串行的执行阶段 1 和 2 和 3,多个线程执行该任务时候,必须要保证所有线程的阶段 1 全部完成后才能进行阶段 2 执行,所有线程的阶段 2 全部完成后才能进行阶段 3 执行,下面使用 CyclicBarrier 来完成这个需求。例子如下:


package com.hjc;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by cong on 2018/7/7.
 */
public class CyclicBarrierTest2 {

    // 创建一个CyclicBarrier实例
    private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " step1");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step2");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step3");

                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() +  " step1");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step2");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step3");

                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });

        //关闭线程池
        executorService.shutdown();
    }
}


运行结果如下:

如上代码,在每个子线程执行完 step1 后都调用了 await 方法,所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程完成了 step1 后才会开始执行 step2,然后在 step2 后面调用了 await 方法,这保证了所有线程的 step2 完成后,线程才能开始 step3 的执行,这个功能使用单个 CountDownLatch 是无法完成的。

目录
相关文章
|
1天前
|
Java 开发者
在Java编程中,if-else与switch作为核心的条件控制语句,各有千秋。if-else基于条件分支,适用于复杂逻辑;而switch则擅长处理枚举或固定选项列表,提供简洁高效的解决方案
在Java编程中,if-else与switch作为核心的条件控制语句,各有千秋。if-else基于条件分支,适用于复杂逻辑;而switch则擅长处理枚举或固定选项列表,提供简洁高效的解决方案。本文通过技术综述及示例代码,剖析两者在性能上的差异。if-else具有短路特性,但条件增多时JVM会优化提升性能;switch则利用跳转表机制,在处理大量固定选项时表现出色。通过实验对比可见,switch在重复case值处理上通常更快。尽管如此,选择时还需兼顾代码的可读性和维护性。理解这些细节有助于开发者编写出既高效又优雅的Java代码。
6 2
|
1天前
|
Java 开发者
在Java编程的广阔天地中,if-else与switch语句犹如两位老练的舵手,引领着代码的流向,决定着程序的走向。
在Java编程中,if-else与switch语句是条件判断的两大利器。本文通过丰富的示例,深入浅出地解析两者的特点与应用场景。if-else适用于逻辑复杂的判断,而switch则在处理固定选项或多分支选择时更为高效。从逻辑复杂度、可读性到性能考量,我们将帮助你掌握何时选用哪种语句,让你在编程时更加得心应手。无论面对何种挑战,都能找到最适合的解决方案。
5 1
|
1天前
|
搜索推荐 Java 程序员
在Java编程的旅程中,条件语句是每位开发者不可或缺的伙伴,它如同导航系统,引导着程序根据不同的情况做出响应。
在Java编程中,条件语句是引导程序根据不同情境作出响应的核心工具。本文通过四个案例深入浅出地介绍了如何巧妙运用if-else与switch语句。从基础的用户登录验证到利用switch处理枚举类型,再到条件语句的嵌套与组合,最后探讨了代码的优化与重构。每个案例都旨在帮助开发者提升编码效率与代码质量,无论是初学者还是资深程序员,都能从中获得灵感,让自己的Java代码更加优雅和专业。
5 1
|
1天前
|
Java
在Java编程的广阔天地中,条件语句是控制程序流程、实现逻辑判断的重要工具。
在Java编程中,if-else与switch作为核心条件语句,各具特色。if-else以其高度灵活性,适用于复杂逻辑判断,支持多种条件组合;而switch在多分支选择上表现优异,尤其适合处理枚举类型或固定选项集,通过内部跳转表提高执行效率。两者各有千秋:if-else擅长复杂逻辑,switch则在多分支选择中更胜一筹。理解它们的特点并在合适场景下使用,能够编写出更高效、易读的Java代码。
5 1
|
1天前
|
缓存 负载均衡 安全
|
3天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践
|
3天前
|
设计模式 算法 Java
Java编程中的设计模式:简化复杂性的艺术
在Java的世界中,设计模式如同一位智慧的导师,指引着开发者们在复杂的编码迷宫中找到出口。本文将深入浅出地探讨几种常见的设计模式,通过实例演示如何在Java项目实践中运用这些模式,从而提升代码的可维护性和扩展性。无论你是新手还是资深开发者,这篇文章都将为你打开一扇通往高效编码的大门。
12 1
|
4天前
|
Java 程序员 调度
深入浅出Java多线程编程
Java作为一门成熟的编程语言,在多线程编程方面提供了丰富的支持。本文将通过浅显易懂的语言和实例,带领读者了解Java多线程的基本概念、创建方法以及常见同步工具的使用,旨在帮助初学者快速入门并掌握Java多线程编程的基础知识。
4 0
|
安全 Java
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19531 0
|
Java 安全
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3106 0