JUC并发编程——CountDownLatch&Semaphore&CyclicBarrier

简介: JUC并发编程——CountDownLatch&Semaphore&CyclicBarrier

正文


CountDownLatch


Java的concurrent包里面的CountDownLatch其实可以把它看作一个计数器,只不过这个计数器的操作是原子操作,同时只能有一个线程去操作这个计数器,也就是同时只能有一个线程去减这个计数器里面的值。


你可以向CountDownLatch对象设置一个初始的数字(大于0)作为计数值,任何调用这个对象上的await()方法都会阻塞,直到这个计数器的计数值被其他的线程减为0为止。


CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。


CountDownLatch使用


package com.xiaojie.aqs;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 模拟多线程请求商品信息,等待所有的数据执行完毕之后,同时返回数据,
 * 看着有点类似 CyclicBarrier
 * @date 2022/1/9 0:00
 */
public class CountDownDemo {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread t1 = new Thread(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("商品基本信息");
        }, "t1线程");
        t1.start();
        Thread t2 = new Thread(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("商品详情信息");
        }, "t2线程");
        t2.start();
        Thread t3 = new Thread(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("商品配送信息");
        }, "t3线程");
        t3.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("唤醒线程。。。。。。。");
        countDownLatch.countDown();
    }
}


CountDownLatch源码分析


以下源码分析基于JDK17,与JDK8有所不同,但思想是一致的


222.png


CountDownLatch有一个Sync的内部类,该类继承了AbstractQueuedSynchronizer(AQS),主要方法有


public void await() {} //阻塞当前线程
public boolean await(long timeout, TimeUnit unit)//有阻塞时间的阻塞方法
 public void countDown() {}//唤醒阻塞线程


构造函数方法解读


222.png


初始化CountDownLatch(int)这个数值必须是不能小于0的数值,当然可以为0,但是如果是0的话那就不能阻塞线程了,而没有实际意义。该值对应AQS中state的值。


await()方法解读


444.png

770259d1a417490293a94d9fc2494375.png


tryAcquireShared()方法解读


111.png


1处代码,表示如果state值为0则返回1如果不是0则为-1。而构造函数中这个数值只有大于0才有意义。


2处代码,表示将请求的线程放入AQS的等待队列中(双向链表


这个代码的意思就是,如果调用了await(),就将当前线程放进AQS的等待队列,让线程阻塞。


countDown()方法解读


333.png

222.png



tryReleaseShared(1)方法解读


111.png


如果要唤醒AQS队列中的线程,只需要唤醒最前面线程即可。上面的方法通过CAS修改状态值为0。


signalNext()方法最终会调用到LockSupport.unpark(s.waiter)方法

Semaphore


Semaphore是计数信号量。Semaphore管理一系列许可。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可这个对象,Semaphore只是维持了一个可获得许可证的数量。Semaphore应用场景,比如接口限流。


Semaphore使用


package com.xiaojie.aqs;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * @author xiaojie
 * @version 1.0
 * @description: Semaphore代码简单使用
 * @date 2022/1/9 14:20
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(5, false);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i=0;i<10;i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire(); 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"running........");
            });
        }
        semaphore.release();  //释放一个资源
        executorService.shutdown();
    }
}


CyclicBarrier源码分析


  private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //加锁
        try {
            final CyclicBarrier.Generation g = generation;
            if (g.broken)
                //检查当前屏障是否被打破,如果破坏了抛出异常
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                //判断线程是否中断,如果中断,破坏栅栏执行trip.signalAll()方法唤醒所有的线程,并且抛出中断异常
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count; //计数器数值减1
            if (index == 0) {  // 如果减到0需要唤醒所有的线程并转换到下一代
                Runnable command = barrierCommand;
                if (command != null) {
                    try {
                        command.run();
                    } catch (Throwable ex) {
                        breakBarrier();
                        throw ex;
                    }
                }
                nextGeneration();//唤醒所有的线程并且进入下一代
                return 0;
            }
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)//判断是否有等待超时时间
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos); //设置阻塞等待时间
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken) //打破栅栏抛出异常
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index; //如果线程因为换代操作而被唤醒返回计数器的值
                if (timed && nanos <= 0L) {//如果等待超时破坏栅栏,唤醒所有的线程
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//释放锁
        }
    }



说明:


private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();


CyclicBarrier是通过Condition来实现的,通过condition.signalAll()方法来唤醒线程,而Condition是基于单向链表来实现的,如果需要唤醒线程就将线程加入到AQS的等待队列(双向链表)中,以此来唤醒线程。

相关文章
|
9月前
|
资源调度
JUC并发编程之同步器(Semaphore、CountDownLatch、CyclicBarrier、Exchanger、CompletableFuture)附带相关面试题
1.Semaphore(资源调度) 2.CountDownLatch(子线程优先) 3.CyclicBarrier(栅栏) 4.Exchanger(公共交换区) 5.CompletableFuture(异步编程)
104 0
|
消息中间件 JavaScript 小程序
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 下
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 下
|
JavaScript 小程序 Java
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 上
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 上
|
Java API
Java并发之CountDownLatch
Java并发之CountDownLatch
132 1
Java并发之CountDownLatch
|
安全 Java 调度
JUC并发编程学习(八)-CountDownLatch、CyclicBarrier、Semaphore的使用
JUC并发编程学习(八)-CountDownLatch、CyclicBarrier、Semaphore的使用
JUC并发编程学习(八)-CountDownLatch、CyclicBarrier、Semaphore的使用
|
安全
JUC CyclicBarrier和Semphore
JUC CyclicBarrier和Semphore
75 0
|
算法 Java 数据库
Java并发:同步工具类详解(CountDownLatch、CyclicBarrier、Semaphore)
Java并发:同步工具类详解(CountDownLatch、CyclicBarrier、Semaphore)
265 0