JUC并发编程学习(八)-CountDownLatch、CyclicBarrier、Semaphore的使用

简介: JUC并发编程学习(八)-CountDownLatch、CyclicBarrier、Semaphore的使用

常用辅助类

CountDownLatch减法计数器

CountDownLatch:减计数器,使用场景:

一间教室有6名学生,放学后一名学生负责锁门。只有等教室里人走完了,才会把门锁上。那怎么有效精准的控制什么时候锁门,使用CountDownLatch减法计数器有效控制。

20200401134307494.png

CountDownLatch主要用法:


CountDownLatch downLatch=new CountDownLatch(6); //设置计数器初始值

countDownLatch.countDown(); 出去一人(产生一个线程)计数器就 -1

countDownLatch.await(); 阻塞等待计数器归零

代码demo

package com.jp.countdownlatchdemo;
import java.util.concurrent.CountDownLatch;
/**
 * @className:
 * @PackageName: com.jp.countdownlatchdemo
 * @author: youjp
 * @create: 2020-05-17 14:33
 * @description:    TODO Juc常用工具类:CountDownLatch 减法计数器的学习
 * @Version: 1.0
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch downLatch=new CountDownLatch(6);//初始值6人,有6人需要出教室
        for (int i = 1; i <=6 ; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"线程出教室一人");
                downLatch.countDown();  //每出一个人,计数器就减1
            },String.valueOf(i)).start();
        }
        try {
            downLatch.await();  //等待阻塞计数器归零,也就是6人全部都出教室以后
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+"线程:学生全部出教室了,可以锁门了");
    }
}

运行结果:

20200401134307494.png

使用了CountDownLatch减法计数器,确保了教室无人的情况才可以锁门。

CyclicBarrier加法计数器

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

CyclicBarrier提供两个构造方法CyclicBarrier(int parties)和CyclicBarrier(int parties, Runnable barrierAction):

 /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

CyclicBarrier构造方法

  • CyclicBarrier(int parties)

默认构造方法,参数表示拦截的线程数量。

  • CyclicBarrier(int parties, Runnable barrierAction)

由于线程之前的调度是由CPU决定的,所以默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。


创建CyclicBarrier后,每个线程调用await方法告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。接下来我们来看看await方法的具体实现。

  • await实现

CyclicBarrier同样提供带超时时间的await和不带超时时间的await:

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

整个await方法的核心是dowait方法的调用,我们来看看dowait的具体实现。

  • dowait的前段部分
/**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
    }

在dowait的前段部分,主要完成了当所有线程都到达同步点(barrier)时,唤醒所有的等待线程,一起往下继续运行,可根据参数barrierAction决定优先执行的线程。

  • 在dowait的后段部分
  // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }

在dowait的实现后半部分,主要实现了线程未到达同步点(barrier)时,线程进入Condition自旋等待,直到等待超时或者所有线程都到达barrier时被唤醒。

  • 整个dowait

使用ReentrantLock保证每一次操作线程安全;

线程等待/唤醒使用Lock配合Condition来实现;

线程被唤醒的条件:等待超时或者所有线程都到达barrier。

到这里为止,CyclicBarrier的重要实现源码分析就结束了,接下来还是照样给出一个具体的使用案例,方便掌握CyclicBarrier的具体用法。

使用案例

集齐7颗龙珠后,才能进行召唤神龙许愿。

package com.jp.CyclicBarrierDemo;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * @className:
 * @PackageName: com.jp.CyclicBarrierDemo
 * @author: youjp
 * @create: 2020-05-17 15:10
 * @description:    TDOO 加法计数器的学习
 * @Version: 1.0
 */
public class CycliBarrierDemo {
    public static void main(String[] args) {
        //1.创建CycliBarrier对象
        CyclicBarrier cyclicBarrier=new CyclicBarrier(7, new Runnable() {
            @Override
            public void run() {
                System.out.println("集齐7颗龙珠了,神龙召唤成功");
            }
        });
        for (int i = 1; i <=7; i++) {
            final int temp=i;
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"收集到第"+temp+"颗龙珠");
                    try {
                        cyclicBarrier.await();  //未满足7颗条件,等待阻塞
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                },String.valueOf(i)).start();
        }
    }
}

执行结果

20200401134307494.png

CountDownLatch、CyclicBarrier区别

CountdownLatch阻塞主线程,等所有子线程完结了再继续下去。Syslicbarrier阻塞一组线程,直至某个状态之后再全部同时执行,并且所有线程都被释放后,还能通过reset来重用。


CyclicBarrier,让一组线程到达一个同步点后再一起继续运行,在其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。

CyclicBarrier只能唤起一个任务,CountDownLatch可以唤起多个任务。

CyclicBarrier可重用,CountDownLatch不可重用,计数值为0该CountDownLatch就不可再用了

Semaphore信号量

Semaphore(信号量)用于控制同时访问特定资源的线程数量,以保证合理的使用公共资源。适用场景:限制资源,如抢位置、限流等。

工作原理

以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。

Semaphore主要方法:

void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。

void release():释放一个许可,将其返回给信号量。

int availablePermits():返回此信号量中当前可用的许可数。

boolean hasQueuedThreads():查询是否有线程正在等待获取。

示例

package com.jp.semaphoreDemo;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
 * @className:
 * @PackageName: com.jp.semaphoreDemo
 * @author: youjp
 * @create: 2020-05-17 16:37
 * @description:    TODO 信号量demo: 5辆车使用3个停车位,每个停车位停靠10秒钟
 * @Version: 1.0
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        //资源:3个停车位
        Semaphore semaphore=new Semaphore(3);
        for (int i = 1; i <=5; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();   //等待获取许可,占用资源
                    System.out.println(Thread.currentThread().getName()+"号车获取到了停车位");
                    TimeUnit.SECONDS.sleep(10); //抢到车位后,这这车位上停靠10秒中
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();//释放资源
                    System.out.println(Thread.currentThread().getName()+"号车离开了停车位");
                }
            },String.valueOf(i)).start();
        }
    }
}

运行结果:

20200401134307494.png

有兴趣的老爷,可以关注我的公众号【一起收破烂】,回复【006】获取2021最新java面试资料以及简历模型120套哦~


相关文章
|
22天前
|
Java 数据库 开发者
CountDownLatch、CyclicBarrier和Semaphore原理和区别
CountDownLatch、CyclicBarrier和Semaphore
|
资源调度
JUC并发编程之同步器(Semaphore、CountDownLatch、CyclicBarrier、Exchanger、CompletableFuture)附带相关面试题
1.Semaphore(资源调度) 2.CountDownLatch(子线程优先) 3.CyclicBarrier(栅栏) 4.Exchanger(公共交换区) 5.CompletableFuture(异步编程)
138 0
|
消息中间件 JavaScript 小程序
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 下
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 下
|
JavaScript 小程序 Java
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 上
JUC多线程:CountDownLatch、CyclicBarrier、Semaphore 同步器原理 上
|
Java API
Java并发之CountDownLatch
Java并发之CountDownLatch
158 1
Java并发之CountDownLatch
|
Java
Java多线程-CountDownLatch、Semaphone、CyclicBarrier入门
多线程CountDownLatch、Semaphone、CyclicBarrier讲解
172 1
Java多线程-CountDownLatch、Semaphone、CyclicBarrier入门
JUC并发编程——CountDownLatch&Semaphore&CyclicBarrier
JUC并发编程——CountDownLatch&Semaphore&CyclicBarrier
140 0
JUC并发编程——CountDownLatch&Semaphore&CyclicBarrier
|
NoSQL Java Redis
【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用(上)
【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用(上)
【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用(上)
|
Java
Java并发编程之CyclicBarrier
Java并发编程之CyclicBarrier
110 0
Java并发编程之CyclicBarrier