CountDownLatch,CyclicBarrier,Semaphore

简介: 在开发过程中我们常常遇到需要对多个任务进行汇总,比如报表,或者大屏显示,需要将所有接口的数据都 获取到后再进行汇总,如果使用同步的方式,那么会比较耗时,体验不好,所以我们使用多线程,但是使用多线程 只能异步的执行,有些接口响应比较快,有些比较慢,而返回结果之间又有依赖,这样就无法汇总了, 所以我们引入了CountDownLatch,它能让所有子线程全部执行完毕后主线程才会往下执行,如果子线程没有执行完毕 ,那么主线程将无法继续向下执行。

CountDownLatch


在开发过程中我们常常遇到需要对多个任务进行汇总,比如报表,或者大屏显示,需要将所有接口的数据都 获取到后再进行汇总,如果使用同步的方式,那么会比较耗时,体验不好,所以我们使用多线程,但是使用多线程 只能异步的执行,有些接口响应比较快,有些比较慢,而返回结果之间又有依赖,这样就无法汇总了, 所以我们引入了CountDownLatch,它能让所有子线程全部执行完毕后主线程才会往下执行,如果子线程没有执行完毕 ,那么主线程将无法继续向下执行。


例子:我们需要对三个接口的返回结果进行求和。


   模拟三个接口


public static Integer getOne(){
        return 1;
    }
    public static Integer getTwo(){
        return 2;
    }
    public static Integer getThree(){
        return 3;
    }


我们创建一个线程池和CountDownLatch,CountDownLatch构造函数参数我们传3,表示计数器为3


   static ExecutorService executorService = Executors.newCachedThreadPool();
    /**
     * CountDownLatch(3) , 构造函数参数为3,
     */
    static volatile CountDownLatch countDownLatch = new CountDownLatch(3);


main函数,我们将三个任务加入线程池中,并且调用了countDownLatch.countDown(),调用此方法后计数器减1,因为一开始我们设置的 计数器为3,而三个线程执行后,每个-1,此时计算器变为0,这时候主线程的await才会返回,主线程才会向下执行,如果我们将计算器设置为 10,三个线程-3,此时计算器为7,那么await将会一直阻塞,主线程则无法向下执行,所以一定要让计算器为0后才会向下执行,


public static void main(String[] args) throws InterruptedException, ExecutionException {
        Future<Integer> futureOne = executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName()+"  is over");
            Thread.sleep(2000);
            countDownLatch.countDown();
            return getOne();
        });
        Future<Integer> futureTwo = executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName()+"  is over");
            Thread.sleep(2000);
            //计数器-1
            countDownLatch.countDown();
            return getTwo();
        });
        Future<Integer> futureThree = executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName()+"  is over");
            Thread.sleep(2000);
            countDownLatch.countDown();
            System.out.println("count3 "+countDownLatch.getCount());
            return getThree();
        }); 
        //阻塞,等到计数器为0蔡往下执行
        countDownLatch.await();
        System.out.println("count  "+countDownLatch.getCount());
        System.out.println("child thread over , main thread start");
        Integer value1 = futureOne.get();
        Integer value2 = futureTwo.get();
        Integer value3 = futureThree.get();
        int total = value1 + value2 + value3;
        System.out.println("total  "+total);
    }


CyclicBarrier


CyclicBarrier和CountDownLatch很像,下面我们用它来实现CountDownLatch计数器功能, 代码如下,我们创建了一个CyclicBarrier,它的构造函数为parties和Runnable接口,parties表示计数器,Runnable表示parties 为0时执行的任务,我们再main函数中两个线程任务中执行后都进行了cyclicBarrier.await()操作,每进行一次,计数器parties值-1,两次 后parties为0,此时出发Runnable任务,


package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * TODO
 *
 * @author 刘牌
 * @version 1.0
 * @date 2021/9/4 0004 23:28
 */
public class CyclicBarrierTest2 {
    static ExecutorService executorService = Executors.newCachedThreadPool();
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
        @Override
        public void run() {
            System.out.println("task1 and task2 over , It's my turn");
        }
    });
    public static void main(String[] args) {
        executorService.submit(() -> {
            System.out.println("do task1");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        executorService.submit(() -> {
            System.out.println("do task2");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    }
}


再举一个CountDownLatch无法实现的功能,执行有三个任务,每个任务都有三个阶段,需要每个任务的阶段同时执行,再到下一个阶段,阶段一执行后 到阶段2,阶段2完成后再到阶段3,code如下。


package thread;
import java.util.concurrent.*;
/**
 * TODO
 *  来自《并发编程之美》例子
 * @author 刘牌
 * @version 1.0
 * @date 2021/9/4 0004 22:09
 */
public class CyclicBarrierTest {
    static ExecutorService executorService = Executors.newCachedThreadPool();
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    public static void main(String[] args) {
        executorService.submit(() -> {
            System.out.println("task1");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("task2");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("task3");
        });
        executorService.submit(() -> {
            System.out.println("task1");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("task2");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("task3");
        });
        executorService.submit(() -> {
            System.out.println("task1");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("task2");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("task3");
        });
    }
}


Semaphore


Semaphore即信号量机制,它和CountDownLatch,CyclicBarrier类似,都是计数器的思想, 构造函数permits表示计数器,在每个线程任务执行完毕时我们调用了semaphore.release(),那么permits的值将会+1 因为我们执行了连个线程任务,所以此时permits为2,semaphore.acquire(2)处则满足条件,主线程将往下执行,如果改为 semaphore.acquire(3),那么主线程将会一直阻塞,因为计数器为2.


package thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * TODO
 *信号量机制
 * @author 刘牌
 * @version 1.0
 * @date 2021/9/4 0004 21:36
 */
public class SemaphoreTest {
    static Semaphore semaphore = new Semaphore(0);
    static ExecutorService executorService = Executors.newCachedThreadPool();
    public static void main(String[] args) throws InterruptedException {
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName()+"  is over");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release();
        });
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName()+"  is over");
            try {
                Thread.sleep(7000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release();
        });
        semaphore.acquire(2);
        System.out.println("all child thread is over,main thread is start");
        executorService.shutdown();
    }
}



目录
相关文章
|
17小时前
|
Java 测试技术
CountDownLatch、CyclicBarrier让线程听我号令
CountDownLatch、CyclicBarrier让线程听我号令
44 0
|
17小时前
CountDownLatch和CyclicBarrier你使用过吗?
CountDownLatch和CyclicBarrier你使用过吗?
26 0
|
17小时前
|
存储 Java 数据库连接
线程通信(CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
线程通信(CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
35 0
CountDownLatch和 CyclicBarrier的使用
CountDownLatch和 CyclicBarrier的使用
CountDownLatch、CyclicBarrier的使用(门栓)
CountDownLatch、CyclicBarrier的使用(门栓)
88 0
CountDownLatch&CyclicBarrier&Semaphore
本文将介绍一下CountDownLatch 、 CyclicBarrier 、 Semaphore这几个控制线程的类。
 CountDownLatch&CyclicBarrier&Semaphore
|
消息中间件
CountDownLatch&CyclicBarrier
CountDownLatch&CyclicBarrier
115 0
CountDownLatch&CyclicBarrier