2.循环屏障的源码剖析
public class CyclicBarrier { // 每一代都会生成新的Generation private static class Generation { // broken标记,用来存放屏障是否被损坏 // 被损坏的屏障是不能被使用的 boolean broken = false; } /** 内部维护一个可重入锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 内部维护一个Condition */ private final Condition trip = lock.newCondition(); /** 屏障的最大容量 */ private final int parties; /* 冲破屏障后被执行的任务 */ private final Runnable barrierCommand; /** 生成当前轮的Generation */ private Generation generation = new Generation(); // 默认为最大的阻挡容量,每加入一个线程减1 // 与CountDownLatch类似 // 当屏障被冲破或重置,会将count重置为最大的阻挡容量 private int count; // 当屏障被冲破后,将调用该方法开启下一轮 private void nextGeneration() { // 唤醒所有等待中的线程 trip.signalAll(); // 重置count count = parties; //创建新的Generation对象 generation = new Generation(); } // 破坏当前的屏障,破坏后当前轮次的屏障就不能够再使用了 // 除非重置生成新代 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } // 开始等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { // 由于这里没有使用时间策略,因此如果出现超时,就是异常状况 throw new Error(toe); } } // 可超时的等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } // 真正的等待流程 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加锁 因为会有多个线程同时调用await方法, // 需要保证每次只有一个线程能进入 lock.lock(); try { final Generation g = generation; // 确定屏障未被破坏 if (g.broken) throw new BrokenBarrierException(); // 需要破坏屏障的第一种情况:线程中断 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 可以冲破屏障了 if (index == 0) { boolean ranAction = false; try { // 执行冲破屏障后的任务 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 更新代数 nextGeneration(); return 0; } finally { // 损坏屏障的第二种情况:执行任务异常 if (!ranAction) breakBarrier(); } } // 走到这说明加入的线程数量不够冲破屏障 for (;;) { // 无限循环,直到冲破屏障,超时或者出现异常 try { // 看看是否是限时的 if (!timed) // breakBarrier|nextGeneration会唤醒trip trip.await(); //非定时永久等待 else if (nanos > 0L) //定时等待指定时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 破坏屏障的第一种情况:中断 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //走到这说明trip被唤醒 // 即使被唤醒,但是屏障被损坏的情况还是需要抛异常 if (g.broken) throw new BrokenBarrierException(); // 代数有更新,说明进行了换代 // 返回,并带返回参数:当前是第几个等待的线程 if (g != generation) return index; // 等待超时,破坏屏障的第三种情况 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } public int getParties() { return parties; } public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
上面是通过内部类generation来实现屏障的更新迭代的,这种处理方式值得关注学习。
除此以外,上面的源码部分应该很好理解,这里就介绍到这里,下一篇文章将介绍并发工具类Semaphore和Exchanger,以及Fork/Join框架,这也会是并发编程基础篇的最后一篇,后面笔者还可能输出一些高阶内容