CyclicBarrier概述
CyclicBarrier字面意思是可循环使用的线程屏障。
CyclicBarrier的功能和CountDownLatch功能有点相似。都能实现线程间相互等待,直到线程做完某些任务,唤醒等待线程。那么既然他们功能类似,提供一种解决方案不就行了吗,为什么还要再提供一个呢。原因是他们的侧重点其实还不一样。在CountDownLatch中我们把线程归类为两种,一类是工作线程,一类是阻塞线程。工作线程执行完任务,可以调用countDown()方法,释放共享锁,阻塞线程则是通过await()方法 自旋获取共享锁,注意这里工作线程是不会阻塞的。在CyclicBarrier中其实只有一类线程,那就是工作线程,假设有5个工作线程,工作线程执行完任务,会判断其他4个工作线程是否执行结束,如果还有线程没有执行完,那么工作线程会阻塞,等待其他线程结束完。当其他线程都执行完了才会执行下一步,CyclicBarrier会阻塞工作线程。而且CyclicBarrier内部使用的是ReentrantLock和Condition。我们知道Condition可以让线程阻塞,并且放入到Condition的单链表中。
public class CyclicBarrierUsage { public static void main(String[] args) { int N = 5;//一共有五个线程,如果线程都执行完了 CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new Runnable() {// @Override public void run() { System.out.println(Thread.currentThread().getName() + " --- after run "); } }); for (int i = 0; i < N; i++) {//五个工作线程 new Thread() { @Override public void run() { super.run(); try { TimeUnit.SECONDS.sleep(1);//模拟工作 System.out.println(Thread.currentThread().getName() + " --- completed "); cyclicBarrier.await();//执行完工作 等待其他线程完成 System.out.println(Thread.currentThread().getName() + " --- run again "); } catch (InterruptedException e) { e.printStackTrace(); }catch (BrokenBarrierException e) { e.printStackTrace(); } } }.start(); } } }
输出结果
Thread-2 — completed
Thread-1 — completed
Thread-0 — completed
Thread-4 — completed
Thread-3 — completed
Thread-3 — after run
Thread-3 — run again
Thread-2 — run again
Thread-0 — run again
Thread-4 — run again
Thread-1 — run again
构建CyclicBarrier的时候我们往构造函数传递了一个Runnable对象,这个Runnable会在线程屏障点到达的时候,被刚好到达屏障点的那个线程执行。所以它是在工作线程中执行的。如果我们在Android开发中,需要在UI线程中执行Runnable还需要转到UI线程执行
源码解析
构造函数
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties;//线程个数 this.count = parties; this.barrierCommand = barrierAction;//到达屏障点 需要执行的任务 } public CyclicBarrier(int parties) { this(parties, null); }
成员变量
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation();
通过成员变量我们可以知道内部使用的就是ReentrantLock和Condition
await()
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
接下来我们详细讲解下dowait(),基本上精华都在里面了
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//独占锁上锁 try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count;//线程获取到锁了,count-1 if (index == 0) { // index=0表示所有线程都执行过了,触发屏障 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run();//执行Runnable ranAction = true; nextGeneration();//nextGeneration会调用trip.signalAll(),唤醒所有等待在trip上的线程 return 0; } finally { if (!ranAction)//如果抛异常了 唤醒所有等待在trip上的线程 breakBarrier(); } } // 如果index!=0表示还有其他线程没有执行过,那么调用trip.await(),让当前线程阻塞 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }