案例二
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("Exception", e); } log.info("{} continue", threadNum); } } // 输出 21:43:03.378 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 is ready 21:43:04.376 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 is ready 21:43:05.376 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 is ready 21:43:05.382 [pool-1-thread-1] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:05.382 [pool-1-thread-2] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:05.382 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 continue 21:43:05.382 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:05.382 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 continue 21:43:05.382 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 continue 21:43:06.377 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 is ready 21:43:06.377 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:06.377 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 continue 21:43:07.377 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 4 is ready 21:43:07.377 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:07.377 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 4 continue 21:43:08.378 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 5 is ready 21:43:08.378 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:08.378 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 5 continue 21:43:09.378 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 6 is ready 21:43:09.378 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:09.378 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 6 continue 21:43:10.379 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 7 is ready 21:43:10.379 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:10.379 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 7 continue 21:43:11.380 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 8 is ready 21:43:11.380 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:11.380 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 8 continue 21:43:12.380 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 9 is ready 21:43:12.380 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception 21:43:12.380 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 9 continue Process finished with exit code 0
分析 await(timeout,TimeUnit) 方法
在 CyclicBarrier 上进行限时的阻塞等待,直到发生以下情形之一。
在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
当前线程等待超时,则抛出 TimeoutException 异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
案例三
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample3 { private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } } // 输出 21:47:26.683 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 is ready 21:47:27.682 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 is ready 21:47:28.684 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 is ready 21:47:29.683 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 is ready 21:47:30.683 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 is ready 21:47:30.683 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - callback is running 21:47:30.683 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 continue 21:47:30.683 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 continue 21:47:30.683 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 continue 21:47:30.683 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 continue 21:47:30.683 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 continue 21:47:31.684 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 is ready 21:47:32.684 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 is ready 21:47:33.685 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 is ready 21:47:34.685 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 is ready 21:47:35.685 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 is ready 21:47:35.685 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - callback is running 21:47:35.685 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 continue 21:47:35.686 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 continue 21:47:35.686 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 continue 21:47:35.686 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 continue 21:47:35.686 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 continue Process finished with exit code 0
- 在案例一的基础上,无非新增了一个回调函数功能:在线程达到屏障的时候,优先执行该回调函数先。