前言
CyclicBarrier是用于多线程相互等待到达一个公共的屏障点的同步工具类,如下图,小明,小宝,小周都需要等待其他人到达公共的屏障点后,才能继续做其他事,常用于多个线程需要一起完成的任务。本质是通过Lock,控制一个state状态,原子性的更新到0。
CyclicBarrier使用的两种场景
1、 n个线程准备好了一些数据,然后调用await方法,主线程等他们都处理好了,可以汇总n个线程准备好的数据。
2、 n个线程等待都到齐了,再一个一个处理,await方法相当于签到,每个线程都调用await先签到,等到屏障数等于0,再处理后面的逻辑,最后签到的线程执行屏障并唤醒其它等待的线程。
CyclicBarrier与CountDownLatch区别
1、CyclicBarrier可以重置,而CountDownLatch不可以重置。
2、CountDownLatch是一个或多个线程等计数减少到0,一起开始做事。
CyclicBarrier是最后一个线程,唤醒其它等计数器为0的线程,开始做事。
3、CyclicBarrier是基于独占锁ReentrantLock独占锁实现,CountDownLatch是基于共享锁实现。
await方法实现
通过一个ReentrantLock,加上一个Condition,每调用await就会parties减少1。
如果parties=0,执行屏障barrierCommand。唤醒其它阻塞的线程继续执行。
如果parties不是0进入阻塞状态,等待被唤醒。
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加同一把互赤锁 lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); }//计数器自减 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //条件阻塞 等待计数器为0 最后一个减少为0的线程 将唤醒所有线程。 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void nextGeneration() { // 唤醒等待在trip条件(计数器大于0前被阻塞)的线程 trip.signalAll(); // 重置计数器 count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
CyclicBarrier实现是基于ReentrantLock实现的,代码比较简单,可以好好学习下。