CountDownLatch
在开发过程中我们常常遇到需要对多个任务进行汇总,比如报表,或者大屏显示,需要将所有接口的数据都 获取到后再进行汇总,如果使用同步的方式,那么会比较耗时,体验不好,所以我们使用多线程,但是使用多线程 只能异步的执行,有些接口响应比较快,有些比较慢,而返回结果之间又有依赖,这样就无法汇总了, 所以我们引入了CountDownLatch,它能让所有子线程全部执行完毕后主线程才会往下执行,如果子线程没有执行完毕 ,那么主线程将无法继续向下执行。
例子:我们需要对三个接口的返回结果进行求和。
模拟三个接口
public static Integer getOne(){ return 1; } public static Integer getTwo(){ return 2; } public static Integer getThree(){ return 3; }
我们创建一个线程池和CountDownLatch,CountDownLatch构造函数参数我们传3,表示计数器为3
static ExecutorService executorService = Executors.newCachedThreadPool(); /** * CountDownLatch(3) , 构造函数参数为3, */ static volatile CountDownLatch countDownLatch = new CountDownLatch(3);
main函数,我们将三个任务加入线程池中,并且调用了countDownLatch.countDown(),调用此方法后计数器减1,因为一开始我们设置的 计数器为3,而三个线程执行后,每个-1,此时计算器变为0,这时候主线程的await才会返回,主线程才会向下执行,如果我们将计算器设置为 10,三个线程-3,此时计算器为7,那么await将会一直阻塞,主线程则无法向下执行,所以一定要让计算器为0后才会向下执行,
public static void main(String[] args) throws InterruptedException, ExecutionException { Future<Integer> futureOne = executorService.submit(() -> { System.out.println(Thread.currentThread().getName()+" is over"); Thread.sleep(2000); countDownLatch.countDown(); return getOne(); }); Future<Integer> futureTwo = executorService.submit(() -> { System.out.println(Thread.currentThread().getName()+" is over"); Thread.sleep(2000); //计数器-1 countDownLatch.countDown(); return getTwo(); }); Future<Integer> futureThree = executorService.submit(() -> { System.out.println(Thread.currentThread().getName()+" is over"); Thread.sleep(2000); countDownLatch.countDown(); System.out.println("count3 "+countDownLatch.getCount()); return getThree(); }); //阻塞,等到计数器为0蔡往下执行 countDownLatch.await(); System.out.println("count "+countDownLatch.getCount()); System.out.println("child thread over , main thread start"); Integer value1 = futureOne.get(); Integer value2 = futureTwo.get(); Integer value3 = futureThree.get(); int total = value1 + value2 + value3; System.out.println("total "+total); }
CyclicBarrier
CyclicBarrier和CountDownLatch很像,下面我们用它来实现CountDownLatch计数器功能, 代码如下,我们创建了一个CyclicBarrier,它的构造函数为parties和Runnable接口,parties表示计数器,Runnable表示parties 为0时执行的任务,我们再main函数中两个线程任务中执行后都进行了cyclicBarrier.await()操作,每进行一次,计数器parties值-1,两次 后parties为0,此时出发Runnable任务,
package thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * TODO * * @author 刘牌 * @version 1.0 * @date 2021/9/4 0004 23:28 */ public class CyclicBarrierTest2 { static ExecutorService executorService = Executors.newCachedThreadPool(); static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println("task1 and task2 over , It's my turn"); } }); public static void main(String[] args) { executorService.submit(() -> { System.out.println("do task1"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executorService.submit(() -> { System.out.println("do task2"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } }
再举一个CountDownLatch无法实现的功能,执行有三个任务,每个任务都有三个阶段,需要每个任务的阶段同时执行,再到下一个阶段,阶段一执行后 到阶段2,阶段2完成后再到阶段3,code如下。
package thread; import java.util.concurrent.*; /** * TODO * 来自《并发编程之美》例子 * @author 刘牌 * @version 1.0 * @date 2021/9/4 0004 22:09 */ public class CyclicBarrierTest { static ExecutorService executorService = Executors.newCachedThreadPool(); static CyclicBarrier cyclicBarrier = new CyclicBarrier(3); public static void main(String[] args) { executorService.submit(() -> { System.out.println("task1"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("task2"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("task3"); }); executorService.submit(() -> { System.out.println("task1"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("task2"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("task3"); }); executorService.submit(() -> { System.out.println("task1"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("task2"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("task3"); }); } }
Semaphore
Semaphore即信号量机制,它和CountDownLatch,CyclicBarrier类似,都是计数器的思想, 构造函数permits表示计数器,在每个线程任务执行完毕时我们调用了semaphore.release(),那么permits的值将会+1 因为我们执行了连个线程任务,所以此时permits为2,semaphore.acquire(2)处则满足条件,主线程将往下执行,如果改为 semaphore.acquire(3),那么主线程将会一直阻塞,因为计数器为2.
package thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * TODO *信号量机制 * @author 刘牌 * @version 1.0 * @date 2021/9/4 0004 21:36 */ public class SemaphoreTest { static Semaphore semaphore = new Semaphore(0); static ExecutorService executorService = Executors.newCachedThreadPool(); public static void main(String[] args) throws InterruptedException { executorService.submit(() -> { System.out.println(Thread.currentThread().getName()+" is over"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); }); executorService.submit(() -> { System.out.println(Thread.currentThread().getName()+" is over"); try { Thread.sleep(7000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); }); semaphore.acquire(2); System.out.println("all child thread is over,main thread is start"); executorService.shutdown(); } }