【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用(上)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用(上)

前言

这三个类都是JDK5为我们提供的处理并发编程的工具。


CountDownLatch:是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。


CyclicBarrier:字面意思是可循环使用(Cyclic)的屏障(Barrier),它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier可重用


Semaphore:Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可


CountDownLatch:闭锁


CountDownLatch也常常被我们称为闭锁,是JUC提供给我们算是比较常用的一个工具了。


最重要的三个方法如下:

public void await() throws InterruptedException { };   //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { };  //将count值减1


如何实现的?


CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。


使用场景:


   1.实现最大并行性:注意不是并发,是并行。并行强调的是所有人同一时刻统一开始。比如:我们想测试一个单例是否有问题,用最大并行数的线程将很容易测试出来。比如:我们跑步,所有人必须在起跑线同一时刻听到枪声才能开跑的场景


   2.开始执行前等待n个线程完成各自任务:这种使用场景应该是最多的。比如:应用程序启动前要求其余所有组件都加载完毕。比如:SpringCloud的健康检查。比如:经典的“一家人一起吃个饭”场景


   3.死锁检测:一个非常方便的辅助测试的场景,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。


代码示例:


模拟SpringCloud的健康检查

image.png


这种服务级别的最终up或者down,可以并行的由各个部分去心跳请求决定,最终汇总结果即可。代码如下:


先可抽象出一个BaseHealthChecker:它抽象出所有健康检查的行为


/**
 * 基类:定义了检查的行为
 *
 * @author fangshixiang
 * @description //
 * @date 2018/12/15 14:52
 */
@Getter
public abstract class BaseHealthChecker implements Runnable {
    private String serviceName; //检查的名称:如网络检查 DB检查 redis检查等等
    private boolean serviceUp; //是否健康 up
    //闭锁应该是同一把 所以传进来
    private CountDownLatch latch;
    public BaseHealthChecker(String serviceName, CountDownLatch latch) {
        super();
        this.latch = latch;
        this.serviceName = serviceName;
        this.serviceUp = false;
    }
    @Override
    public void run() {
        try {
            verifyService();
            serviceUp = true;
        } catch (Throwable t) {
            t.printStackTrace(System.err);
            serviceUp = false;
        } finally {
            if (latch != null) {
                latch.countDown();
            }
        }
    }
    /**
     * 各调用者只需要去实现这个行为即可(此处需要注意:不要返回值  我会认为没有报错 就认为是健康的)
     */
    public abstract void verifyService();
}


检查网络的检查类:NetworkHealthChecker


/**
 * 检查网络
 *
 * @author fangshixiang
 * @description //
 * @date 2018/12/15 14:53
 */
public class NetworkHealthChecker extends BaseHealthChecker {
    public NetworkHealthChecker(CountDownLatch latch) {
        super("Network Service", latch);
    }
    //模拟网络检查  只要不抛出异常 就认为是up的
    @Override
    public void verifyService() {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}


检查DB、redis都类似DBHealthChecker :


/**
 * 检查DB数据库
 *
 * @author fangshixiang
 * @description //
 * @date 2018/12/15 14:53
 */
public class DBHealthChecker extends BaseHealthChecker {
    public DBHealthChecker(CountDownLatch latch) {
        super("Network Service", latch);
    }
    //模拟网络检查  只要不抛出异常 就认为是up的
    @Override
    public void verifyService() {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}


现在可以写一个main方法,来测凸显一下闭锁的功能了:


    //需要进行健康检查的所有的服务们
    private static List<BaseHealthChecker> services = new ArrayList<>();
    //闭锁
    private static CountDownLatch latch;
    private static boolean checkServiceIsUp() throws InterruptedException {
        //先必须要知道 一共有哪些服务是需要健康检查的
        latch = new CountDownLatch(3);
        services.add(new NetworkHealthChecker(latch));
        services.add(new DBHealthChecker(latch));
        services.add(new RedisHealthChecker(latch));
        //启动一个线程池  开多线程去同时健康检查
        Executor executor = Executors.newFixedThreadPool(services.size());
        for (final BaseHealthChecker v : services) {
            executor.execute(v);
        }
        latch.await();
        for (BaseHealthChecker v : services) {
            if (!v.isServiceUp()) {
                return false;
            }
        }
        return true;
    }
    public static void main(String[] args) throws InterruptedException {
        boolean b = checkServiceIsUp();
        System.out.println("总体服务的up状态为:" + b);
    }
输出:
Checking Network Service
Checking DB Service
Checking Redis Service
Network Service is UP
DB Service is UP
Redis Service is UP
总体服务的up状态为:true


我们发现就这样很高效的实现了服务的健康检查,并且耗时由耗时最长的额决定。


Java8提供了基于流式处理的类似功能类:Completablefuture,推荐使用。具体参考我之前的博文:【小家java】Java8新特性之—CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)


缺点


构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。

相关文章
|
6月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
6月前
|
Java
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
前言 主播觉得,AQS的原理,就是通过这2个队列的协助,实现核心功能,同步队列(CLH队列)和条件队列(Condition队列)。 同步队列(CLH队列) 作用:管理需要获...
119 18
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
|
12月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
JAVA并发编程系列(7)Semaphore信号量剖析
腾讯T2面试,要求在3分钟内用不超过20行代码模拟地铁安检进站过程。题目设定10个安检口,100人排队,每人安检需5秒。实际中,这种题目主要考察并发编程能力,特别是多个线程如何共享有限资源。今天我们使用信号量(Semaphore)实现,限制同时进站的人数,并通过信号量控制排队和进站流程。并详细剖析信号量核心原理和源码。
|
安全 Java API
JAVA并发编程JUC包之CAS原理
在JDK 1.5之后,Java API引入了`java.util.concurrent`包(简称JUC包),提供了多种并发工具类,如原子类`AtomicXX`、线程池`Executors`、信号量`Semaphore`、阻塞队列等。这些工具类简化了并发编程的复杂度。原子类`Atomic`尤其重要,它提供了线程安全的变量更新方法,支持整型、长整型、布尔型、数组及对象属性的原子修改。结合`volatile`关键字,可以实现多线程环境下共享变量的安全修改。
|
Java API
java多线程--信号量Semaphore的使用
  Semaphore可以控制某个共享资源可被同时访问的次数,即可以维护当前访问某一共享资源的线程个数,并提供了同步机制.例如控制某一个文件允许的并发访问的数量.   例如网吧里有100台机器,那么最多只能提供100个人同时上网,当来了第101个客人的时候,就需要等着,一旦有一个人人下机,就可以立马得到了个空机位补上去.
1198 0
|
22天前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
Java 数据库 Spring
61 0
|
1月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
98 16
|
2月前
|
缓存 并行计算 安全
关于Java多线程详解
本文深入讲解Java多线程编程,涵盖基础概念、线程创建与管理、同步机制、并发工具类、线程池、线程安全集合、实战案例及常见问题解决方案,助你掌握高性能并发编程技巧,应对多线程开发中的挑战。

热门文章

最新文章