1 CyclicBarrier
CyclicBarrier是让一组线程达到一个屏障(也叫做同步点),当这一组线程执行到达这个屏障(cyclicBarrier.await()代码处)时,这组线程才会继续往下执行。
CyclicBarrier比较适用于多线程计算的场景,当这些线程都执行到某一个预设地点以后,再执行另外的操作。例如,开多个线程批量处理数据,多所有数据都处理完成后再进行汇总分析的场景。
CyclicBarrier和之前介绍的CountDownLatch比较类似,他们的主要区别是:CountDownLatch计数器只能使用一次,而CyclicBarrier计数器可以通过reset方法重置。CyclicBarrier还提供了一些查看阻塞线程数量(getNumberWaiting),判断线程是否被阻塞(isBroken)的方法。
2 源码分析
1) 构造函数
函数parties参数指的是,等待多少个线程进入屏障点,即等待多少线程调用await方法,才算所有线程都达到屏障点。
函数barrierAction指的是当所有线程达到这个屏障以后,将执行此barrierAction的内容。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
2) await
有两个版本,都是挂起当前线程,直到所有线程都达到屏障点时再继续执行。第二个版本支持让线程等待一定的时间,如果到时间以后还有线程没有达到屏障点,那么让已经达到屏障点的线程继续执行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
3) dowait
屏障的核心逻辑都是在此方法中实现的。从以下代码中我们可以看到,他的主要处理逻辑(异常或者其他非主要逻辑忽略)如下:
- 首先检查是否所有线程都达到屏障状态了,如果是,那么执行构造函数第二个参数barrierAction执行的任务。
- 如果不是所有线程都达到屏障状态,那么当前线程挂起。
- 如果线程挂起时指定了挂起时间,那么当时间到以后,此线程被唤醒,接着唤醒此时已经达到屏障状态的线程。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
3 示例
以下示例代码中,我们制定屏障数是3,所以当三个线程调用cyclicBarrier.await()时,所有挂起的线程将被唤醒,然后执行构造函数中指定的任务,当然被唤醒的线程也将继续执行。
public static void main(String[] args) {
int num = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num, ()->{
System.out.println("时间="+DateUtil.getCurrentTime()+" -回调线程- 工作线程到达屏障点后开始执行");
});
for (int i = 0; i < num; i++) {
String name = "T-"+i;
Thread thread = new Thread(() -> {
try {
doTask();
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 完成任务,进入屏障点,等待其他线程");
cyclicBarrier.await();
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 所有线程已进入屏障点,继续执行");
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
}
}
private static void doTask(){
Random random = new Random();
long time = random.nextInt(3000) ;
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
运行结果如下,符合上面的分析。
时间=11:36:52:893 -工作线程- thread=T-0 完成任务,进入屏障点,等待其他线程
时间=11:36:53:174 -工作线程- thread=T-1 完成任务,进入屏障点,等待其他线程
时间=11:36:53:434 -工作线程- thread=T-2 完成任务,进入屏障点,等待其他线程
时间=11:36:53:434 -回调线程- 工作线程到达屏障点后开始执行
时间=11:36:53:434 -工作线程- thread=T-0 所有线程已进入屏障点,继续执行
时间=11:36:53:434 -工作线程- thread=T-1 所有线程已进入屏障点,继续执行
时间=11:36:53:434 -工作线程- thread=T-2 所有线程已进入屏障点,继续执行