Juc并发编程15——循环屏障CyclickBarrier使用与源码剖析(下)

简介: 1.循环屏障的使用

2.循环屏障的源码剖析

public class CyclicBarrier {
   // 每一代都会生成新的Generation
    private static class Generation {
      // broken标记,用来存放屏障是否被损坏
      // 被损坏的屏障是不能被使用的
        boolean broken = false;
    }
    /** 内部维护一个可重入锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 内部维护一个Condition */
    private final Condition trip = lock.newCondition();
    /** 屏障的最大容量 */
    private final int parties;
    /* 冲破屏障后被执行的任务 */
    private final Runnable barrierCommand;
    /** 生成当前轮的Generation */
    private Generation generation = new Generation();
    // 默认为最大的阻挡容量,每加入一个线程减1
    // 与CountDownLatch类似
    // 当屏障被冲破或重置,会将count重置为最大的阻挡容量
    private int count;
   // 当屏障被冲破后,将调用该方法开启下一轮
    private void nextGeneration() {
        // 唤醒所有等待中的线程
        trip.signalAll();
        // 重置count
        count = parties;
        //创建新的Generation对象
        generation = new Generation();
    }
    // 破坏当前的屏障,破坏后当前轮次的屏障就不能够再使用了
    // 除非重置生成新代
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
   // 开始等待
  public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
          // 由于这里没有使用时间策略,因此如果出现超时,就是异常状况
            throw new Error(toe); 
        }
    }
    // 可超时的等待
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    // 真正的等待流程
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //加锁 因为会有多个线程同时调用await方法,
        // 需要保证每次只有一个线程能进入
        lock.lock(); 
        try {
            final Generation g = generation;
      // 确定屏障未被破坏
            if (g.broken)
                throw new BrokenBarrierException();
      // 需要破坏屏障的第一种情况:线程中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            // 可以冲破屏障了
            if (index == 0) { 
                boolean ranAction = false;
                try {
                  // 执行冲破屏障后的任务
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 更新代数
                    nextGeneration();
                    return 0;
                } finally {
                  // 损坏屏障的第二种情况:执行任务异常
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 走到这说明加入的线程数量不够冲破屏障
            for (;;) { // 无限循环,直到冲破屏障,超时或者出现异常
                try {
                  // 看看是否是限时的
                    if (!timed)
                      // breakBarrier|nextGeneration会唤醒trip
                        trip.await(); //非定时永久等待
                    else if (nanos > 0L) //定时等待指定时间
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                  // 破坏屏障的第一种情况:中断
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
        //走到这说明trip被唤醒
        // 即使被唤醒,但是屏障被损坏的情况还是需要抛异常 
                if (g.broken)
                    throw new BrokenBarrierException();
        // 代数有更新,说明进行了换代
        // 返回,并带返回参数:当前是第几个等待的线程
                if (g != generation)
                    return index;
        // 等待超时,破坏屏障的第三种情况
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    public int getParties() {
        return parties;
    }
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

上面是通过内部类generation来实现屏障的更新迭代的,这种处理方式值得关注学习。


除此以外,上面的源码部分应该很好理解,这里就介绍到这里,下一篇文章将介绍并发工具类Semaphore和Exchanger,以及Fork/Join框架,这也会是并发编程基础篇的最后一篇,后面笔者还可能输出一些高阶内容

相关文章
|
2月前
|
安全 Java API
JAVA并发编程JUC包之CAS原理
在JDK 1.5之后,Java API引入了`java.util.concurrent`包(简称JUC包),提供了多种并发工具类,如原子类`AtomicXX`、线程池`Executors`、信号量`Semaphore`、阻塞队列等。这些工具类简化了并发编程的复杂度。原子类`Atomic`尤其重要,它提供了线程安全的变量更新方法,支持整型、长整型、布尔型、数组及对象属性的原子修改。结合`volatile`关键字,可以实现多线程环境下共享变量的安全修改。
|
2月前
|
Java
JAVA并发编程系列(9)CyclicBarrier循环屏障原理分析
本文介绍了拼多多面试中的模拟拼团问题,通过使用 `CyclicBarrier` 实现了多人拼团成功后提交订单并支付的功能。与之前的 `CountDownLatch` 方法不同,`CyclicBarrier` 能够确保所有线程到达屏障点后继续执行,并且屏障可重复使用。文章详细解析了 `CyclicBarrier` 的核心原理及使用方法,并通过代码示例展示了其工作流程。最后,文章还提供了 `CyclicBarrier` 的源码分析,帮助读者深入理解其实现机制。
|
6月前
|
设计模式 安全 Java
Java Review - 并发编程_独占锁ReentrantLock原理&源码剖析
Java Review - 并发编程_独占锁ReentrantLock原理&源码剖析
59 0
|
6月前
|
并行计算 安全 Java
CyclicBarrier(循环屏障)源码解读与使用
CyclicBarrier(循环屏障)源码解读与使用
|
6月前
|
缓存 算法 Java
JUC并发编程之CAS
CAS,即Compare and Swap,是一种并发编程中用于实现多线程环境下的原子操作的技术。它是一种无锁算法,用于解决多线程环境下的数据同步问题。CAS操作包含三个操作数:内存位置V,旧的预期值A和即将要写入的新值B。只有当内存位置的值与旧的预期值A相等时,才会将新值B写入内存位置V,否则不执行任何操作。CAS操作是原子的,保证了多线程环境下的数据一致性和线程安全性。
|
缓存 监控 安全
JUC并发编程之线程锁(一)
1.ReentrantLock(互斥锁)、2.ReentRantReaderWriterLock(互斥读写锁)、3.StampedLock(无障碍锁)、4.Condition(自定义锁)、5.LockSupport
75 0
|
设计模式 Java C++
Java Review - 并发编程_独占锁ReentrantLock原理&源码剖析(上)
Java Review - 并发编程_独占锁ReentrantLock原理&源码剖析(上)
113 0
Java Review - 并发编程_独占锁ReentrantLock原理&源码剖析(上)
|
SpringCloudAlibaba 安全 Java
JUC并发编程(二):线程相关知识点
实现编发编程的主要手段就是多线程。线程是操作系统里的一个概念。接下来先说说两者的定义、联系与区别。
77 0
深入理解JUC:第五章:CyclicBarrier循环栅栏
深入理解JUC:第五章:CyclicBarrier循环栅栏
114 0
深入理解JUC:第五章:CyclicBarrier循环栅栏
|
Java API
【JUC】循环屏障CyclicBarrier详解
【JUC】循环屏障CyclicBarrier详解
143 0
【JUC】循环屏障CyclicBarrier详解