【线程】用 CyclicBarrier 实现线程同步

简介: 在玩游戏的时候我们经常遇到,我的进度条已经到了100%,但是却一直进不了游戏,这就是因为需要相互等待,等所有人准备完毕,才能开始。

前言

大家好,我是小郭,上一篇主要使用了CountDownLatch进行并发流程的控制,但是有另外一个工具它也能实现并发的控制,那就是CyclicBarrier,在玩游戏的时候我们经常遇到,我的进度条已经到了100%,但是却一直进不了游戏,这就是因为需要相互等待,等所有人准备完毕,才能开始。

了解CyclicBarrier

CyclicBarrier 也是一种多线程并发控制的实用工具,和 CountDownLatch 一样具有等待计数的功能,但是相比于 CountDownLatch 功能更加强大,CountDownLatch是一次性的。

主要作用使用它进行线程的同步,进行计数循环、重置。

可以理解为,去火锅店吃火锅,吃完一桌走人,再重新来一桌人。

思考问题

  1. CountDownLatch和CyclicBarrier有什么区别?
  2. CyclicBarrier适用于什么场景?
  3. CyclicBarrier为什么可以循环使用?

主要参数与方法

public class CyclicBarrier {
    /** The lock for guarding barrier entry */
    // 可重入锁
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    // 条件队列
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    // 参与的线程数量
    private final int parties;
    /* The command to run when tripped */
    // 由最后一个进入 barrier 的线程执行的操作
    private final Runnable barrierCommand;
    /** The current generation */
    // 实例
    private Generation generation = new Generation();
    // 正在等待进入等待的线程数量
    private int count;
}
//将线程处于休眠状态,所有的线程到达临界点 
public int await() throws InterruptedException, BrokenBarrierException 
//更新状态,唤醒
all nextGeneration() 
//是否处于断开状态 
boolean isBroken() 
//获取临界点上的线程数 
int getNumberWaiting() 
//重置为初始状态 
reset()

构造方法

public CyclicBarrier(int parties) {
    this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

核心方法

dowait

private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
             // 生成一个实例,CyclicBarrier类存在一个内部类Generation,
             // 每一次使用的CycBarrier可以当成Generation的实例
            final Generation g = generation;
            // broken判断是否为断开,初始化为false
            if (g.broken)
                throw new BrokenBarrierException();
            // 判断线程是否终止
            if (Thread.interrupted()) {
                // 如果终止,broken设置为true,唤醒所有
                breakBarrier();
                throw new InterruptedException();
            }
            // 自减一
            int index = --count;
            // 都到达临界点,出发临界点任务
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    // 任务执行完毕,唤醒所有,broken设置为断开
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // loop until tripped, broken, interrupted, or timed out
            for (; ; ) {
                try {
                    // timed默认为false,挂起当前任务
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                      // 断开屏障
                        breakBarrier();
                        throw ie;
                    } else {
                    // 中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                // 返回当前序号
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

CyclicBarrier为什么可以循环使用?

/**
 * 更新障碍流程的状态并唤醒所有人。
 * Updates state on barrier trip and wakes up everyone.
 * Called only while holding lock.
 */
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

当count为0的时候,就会生成新的屏障,进行初始化的工作同时唤醒所有人

实践

同样还是利用四个人吃火锅在作为例子。通过调用await相互等待,到了临界点的时候出发临界点的线程任务。

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4,() ->{
    System.out.println("人到齐了,开吃!");
});
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    IntStream.range(0,16).forEach(i ->{
        executorService.submit(() ->{
            System.out.println(Thread.currentThread().getName() + "来吃火锅");
            try {
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "动筷子");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    });
    executorService.shutdown();
}

输出结果

pool-1-thread-1来吃火锅
pool-1-thread-4来吃火锅
pool-1-thread-3来吃火锅
pool-1-thread-2来吃火锅
人到齐了,开吃!
pool-1-thread-1来吃火锅
pool-1-thread-3来吃火锅
pool-1-thread-4来吃火锅
pool-1-thread-2来吃火锅
人到齐了,开吃!

场景:T1线程查询库存剩余数量,T2线程校验付款金额,T3线程完成扣减库存

下面是伪代码的实现

// 初始化数量和金额
private static final AtomicInteger num = new AtomicInteger(1);
private static final Integer money = 2;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,() ->{
    int count = num.decrementAndGet();
    System.out.println(Thread.currentThread().getName()+ "扣减库存完成,剩余库存" + count );
});
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    IntStream.range(0,10).forEach(i ->{
        executorService.submit(() ->{
            CheckAll();
        });
    });
    executorService.shutdown();
}
public static void CheckAll(){
    // 查询库存数量
    Thread t1 = new Thread(() ->{
        if (num.get() > 0){
            System.out.println("库存数量充足");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }else {
            System.out.println("库存不足");
        }
    });
    t1.start();
    System.out.println(t1.getName() + "启动");
    // 查询付款金额
    Thread t2 = new Thread(() ->{
        if (num.get() * money  > 0){
            System.out.println("付款金额正确");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }else {
            System.out.println("付款失败");
        }
    });
    t2.start();
    System.out.println(t2.getName() + "启动");
}

在这里我们需要注意,线程池大小为1是必要的,如果设置为多个,有可能会两个线程 A 和 B 同时查询,A 的订单先返回,B 的派送单先返回,造成队列中的数据不匹配,所以1个线程实现生产数据串行执行,保证数据安全。

对比一下CountDownLacth与CyclicBarrier

  1. CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等,一起完成。
  2. CountDownLacth是一次性的,而CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。
  3. CyclicBarrier 可以设置回调函数
  4. 调用 CountDownLatch 的 countDown 方法后,当前线程并不会阻塞,会继续往下执行;而调用 CyclicBarrier 的 await 方法,会阻塞当前线程,直到 CyclicBarrier 指定的线程全部都到达了指定点的时候,才能继续往下执行
相关文章
|
10月前
|
消息中间件 安全 Linux
线程同步与IPC:单进程多线程环境下的选择与权衡
线程同步与IPC:单进程多线程环境下的选择与权衡
191 0
|
10月前
|
Java 测试技术
CountDownLatch、CyclicBarrier让线程听我号令
CountDownLatch、CyclicBarrier让线程听我号令
95 0
多线程学习之解决线程同步的实现方法
多线程学习之解决线程同步的实现方法
51 0
|
8月前
|
Java 开发者
Java面试题:请解释内存泄漏的原因,并说明如何使用Thread类和ExecutorService实现多线程编程,请解释CountDownLatch和CyclicBarrier在并发编程中的用途和区别
Java面试题:请解释内存泄漏的原因,并说明如何使用Thread类和ExecutorService实现多线程编程,请解释CountDownLatch和CyclicBarrier在并发编程中的用途和区别
86 0
|
5月前
|
安全 Java 开发者
在多线程编程中,确保数据一致性与防止竞态条件至关重要。Java提供了多种线程同步机制
【10月更文挑战第3天】在多线程编程中,确保数据一致性与防止竞态条件至关重要。Java提供了多种线程同步机制,如`synchronized`关键字、`Lock`接口及其实现类(如`ReentrantLock`),还有原子变量(如`AtomicInteger`)。这些工具可以帮助开发者避免数据不一致、死锁和活锁等问题。通过合理选择和使用这些机制,可以有效管理并发,确保程序稳定运行。例如,`synchronized`可确保同一时间只有一个线程访问共享资源;`Lock`提供更灵活的锁定方式;原子变量则利用硬件指令实现无锁操作。
53 2
|
7月前
|
Java
多线程线程同步
多线程的锁有几种方式
|
7月前
|
安全 Java
【多线程面试题 六】、 如何实现线程同步?
实现线程同步的方法包括同步方法、同步代码块、使用ReentrantLock、volatile关键字以及原子变量类,以确保线程安全和数据一致性。
|
9月前
|
API
java-多线程-CountDownLatch(闭锁) CyclicBarrier(栅栏) Semaphore(信号量)-
java-多线程-CountDownLatch(闭锁) CyclicBarrier(栅栏) Semaphore(信号量)-
56 1
|
8月前
|
设计模式 安全 Java
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
103 0
|
8月前
|
Java 开发者
Java面试题:解释Java内存模型中的内存可见性,解释Java中的线程池(ThreadPool)的工作原理,解释Java中的CountDownLatch和CyclicBarrier的区别
Java面试题:解释Java内存模型中的内存可见性,解释Java中的线程池(ThreadPool)的工作原理,解释Java中的CountDownLatch和CyclicBarrier的区别
49 0

热门文章

最新文章