【Semaphore、ReentrantLock、CountDownLatch、Cyclicbarrier、ReadWriteLock】多线程交替执行、顺序执行、同时执行、读写分离执行、生产者消费者

简介: 【Semaphore、ReentrantLock、CountDownLatch、Cyclicbarrier、ReadWriteLock】多线程交替执行、顺序执行、同时执行、读写分离执行、生产者消费者

一、交替执行


建立3个线程,完成交替执行,按照123123123…打印


1.1 使用Lock的ReentrantLock实现

public class ThreadPrint {
    public static void main(String[] args) {
        Print123 print123 = new Print123();
        new Thread(()->{
           while (true){
               print123.print1();
           }
        }).start();
        new Thread(()->{
            while (true){
                print123.print2();
            }
        }).start();
        new Thread(()->{
            while (true){
                print123.print3();
            }
        }).start();
    }
}
// 线程资源类
class Print123{
    private int number = 1;
    private Lock lock = new ReentrantLock();
    private Condition con1 = lock.newCondition();
    private Condition con2 = lock.newCondition();
    private Condition con3 = lock.newCondition();
    public void print1(){
        lock.lock();
        try{
            if (number!=1){
                con1.await();
            }
            System.out.println(1);
            Thread.sleep(1000);
            number = 2;
            con2.signal();
        }catch (Exception e){
            System.out.println("1发生异常!");
        }finally {
            lock.unlock();
        }
    }
    public void print2(){
        lock.lock();
        try{
            if (number!=2){
                con2.await();
            }
            System.out.println(2);
            Thread.sleep(1000);
            number = 3;
            con3.signal();
        }catch (Exception e){
            System.out.println("2发生异常!");
        }finally {
            lock.unlock();
        }
    }
    public void print3(){
        lock.lock();
        try{
            if (number!=3){
                con3.await();
            }
            System.out.println(3);
            Thread.sleep(1000);
            number = 1;
            con1.signal();
        }catch (Exception e){
            System.out.println("3发生异常!");
        }finally {
            lock.unlock();
        }
    }
}

72269659272543f2af995354f0b0f498.png


1.2 使用Semaphore交替执行1


每次个线程执行时,先获取唯一的许可,继而打印自己的value值;之后再将许可释放给下一个信号量,让下一个信号量打印value……

public class Print123WithSemaphore {
    public static void main(String[] args) {
        Print123With pring = new Print123With();
        new Thread(() -> pring.print1()).start();
        new Thread(() -> pring.pring2()).start();
        new Thread(() -> pring.print3()).start();
    }
}
class Print123With {
    //定义三个信号量,并且这三个信号量一共只有1个许可证
    private Semaphore semaphore1 = new Semaphore(1);
    private Semaphore semaphore2 = new Semaphore(0);
    private Semaphore semaphore3 = new Semaphore(0);
    public void print1() {
        print("1", semaphore1, semaphore2);
    }
    public void pring2() {
        print("2", semaphore2, semaphore3);
    }
    public void print3() {
        print("3", semaphore3, semaphore1);
    }
    /*
        value:打印的字符
        currentSemaphore:当前信号量
        nextSemaphore:下一个信号量
     */
    private void print(String value, Semaphore currentSemaphore, Semaphore nextSemaphore) {
        for (int i = 0; i < 10; ) {
            try {
                currentSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() +" print "+ value);
                Thread.sleep(1000);
                i++;
                nextSemaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

41f58c217d2940c3a23b565fc92f7ce5.png

1.3 Semaphore交替执行2

public class SemaphorePrint {
    private static Semaphore s1 = new Semaphore(1);
    private static Semaphore s2 = new Semaphore(0);
    private static Semaphore s3 = new Semaphore(0);
    public static void main(String[] args) {
        new Thread(()->{
            while (true){
                try{
                    s1.acquire();
                    System.out.println("A");
                    Thread.sleep(1000);
                }catch (Exception e){
                    System.out.println("异常");
                }finally {
                    s2.release();
                }
            }
        }).start();
        new Thread(()->{
            while (true){
                try{
                    s2.acquire();
                    System.out.println("B");
                    Thread.sleep(1000);
                }catch (Exception e){
                    System.out.println("异常");
                }finally {
                    s3.release();
                }
            }
        }).start();
        new Thread(()->{
            while (true){
                try{
                    s3.acquire();
                    System.out.println("C");
                    Thread.sleep(1000);
                }catch (Exception e){
                    System.out.println("异常");
                }finally {
                    s1.release();
                }
            }
        }).start();
    }
}

059dce04649045fc9134a26d01147f22.png


二、同时并发执行


2.1 Semaphore实现


默认所有的线程都是阻塞的,调用semaphore构造方法,设置线程的并发数,然后通过acquire()方法允许同一时间只能有三个线程来执行,三个线程执行完毕后,就可以调用release()来释放一次可执行的机会,然后从其他的线程里面随机挑选一个来执行。

public class SemaphoreWithBingfa {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        // 同一时间,只允许3个线程并发访问
        Semaphore semp = new Semaphore(3);
        // 创建10个线程
        for (int i = 0; i < 8; i++) {
            final int threadNo = i;
            //execute()方法的参数:重写了run()方法的Runnable对象
            executor.execute(() -> {
                        try {
                            //同一时间,只能有3个线程获取许可去执行
                            semp.acquire();
                            System.out.println("得到许可并执行的线程: " + threadNo);
                            Thread.sleep(1000);
                            // 得到许可的线程执行完毕后,将许可转让给其他线程
                            semp.release();
                        } catch (InterruptedException e) {
                        }
                    }
            );
        }
        //  executor.shutdown();
    }
}


d4e78d3fabbd41bea980ac5d1bfe9b36.png


2.2 CyclicBarrier实现状态同时执行


使用这个CyclicBarrier,可以实现A,B,C全部就绪后,await()表示就绪,然后三者开始同时执行。

例如:当三个人的会议,必须等到三个人到齐后,再能开始进行执行开始。


public class CyclicBarrierTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        //等待3个人全部到齐 才能开会
        CyclicBarrier barrier = new CyclicBarrier(3);
        for (int i = 0; i < 3; i++) {
            //设置人员的编号
            final int person = i;
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "用户" + person + "到达会议室了。");
                    barrier.await();
                    //3个人都到达  线程全部开始执行
                    System.out.println("三个人到齐," + Thread.currentThread().getName() + "开始开会干活!!!");
                } catch (Exception e) {
                }
            });
        }
        executor.shutdown();
    }
}

aa8317534fbe4ea9b0ce1e6573fbeb46.png

2.3 CountDownLatch 实现倒数计时【闭锁】


主要用于确保,某个计算,某个任务,某个服务线程,等到以来的所有线程执行完毕后,再去执行。

例如:等所有人走后,班长再走去关灯。

public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try{
                    System.out.println(Thread.currentThread().getName()+"正在执行...");
                }catch (Exception e){
                }finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        try{
            countDownLatch.await();
        }catch (Exception e){
        }
        System.out.println("主线程执行...");
    }
}

c1fac128cc03416ea2c7b07f1a8e3459.png

三、ReadWriteLock读写锁实现


为了更好地解决多线程读写带来的并发问题,JUC提供了ReadWriteLock,分别为读和写进行加锁的操作。

例如:

t1线程获取读锁,t2也可以获取读锁中的内容

如果t1线程获得了写锁,其他线程不能申请写锁或者读锁,必须等待t1释放锁资源。

这一点有些像mysql中的X锁与S锁,排他锁与共享锁。


3.1 ReentrantReadWriteLock 实现读写分离

public class ReadWriteLockTest {
    private static ReentrantReadWriteLock readwrite = new ReentrantReadWriteLock();
    public static void main(String[] args) {
        // 1.开启两个线程【同时进行读和写】
        new Thread(()->{
            myRead(Thread.currentThread());
            mtWrite(Thread.currentThread());
        },"t1").start();
        new Thread(()->{
            myRead(Thread.currentThread());
            mtWrite(Thread.currentThread());
        },"t2").start();
    }
    // 1.读锁来锁定操作
    static void myRead(Thread thread){
        readwrite.readLock().lock();
        try{
            for (int i = 0; i < 10000; i++) {
                System.out.println(thread.getName()+"正在进行读操作");
            }
            System.out.println(thread.getName()+"读操作完成");
        }finally {
            readwrite.readLock().unlock();
        }
    }
    // 2.写操作来进行锁定
    static void mtWrite(Thread thread){
        readwrite.writeLock().lock();
        try{
            for (int i = 0; i < 10000; i++) {
                System.out.println(thread.getName()+"正在进行写操作");
            }
            System.out.println(thread.getName()+"写操作完成");
        }finally {
            readwrite.writeLock().unlock();
        }
    }
}

2010863834d2409ca5469fe19f05297c.png

36ead7953ca54996bc4d26a2685691f6.png


3.2 CopyOnWrite 读写分离并发容器


同步容器 :Hashtable/Vector/Stack,早期设计中,并没有考虑并发的问题,因此在多出线程的情况下,是会出现ConcurrentModificationException异常的。

除了ConcurrnetHashMap,CopyOnWriteArrayList之外,JUC还提供了CopyOnWriteArraySet,ConcurrentLinkedQueue,PriorityBlockingQueue等并发容器类。

当向一个CopyOnWrite 容器中增加元素的时候,就会将容器复制一份。如:

1.容器复制一份,然后新的容器添加元素

2.增加元素后,在将引用指向新的容器,原容器被GC回收

CopyOnWrite 利用冗余实现了读写分离,适合用于【读多写少】的场景,复制比较消耗性能


四、生产者消费者


文中采用加锁的方式来实现生产者和消费者,如synchronized和Lock,

也可以【共享缓冲区】采用阻塞队列来实现BlockingQueue来实现,设置一个固定大小的缓冲区,达到最大值则无法生产,生产者进行阻塞,知道消费者消费后,小于阻塞队列的大小,则开始进行生产。


4.1 Synchronized来实现

//测试方法
public class ProducerWithConsumerWithSynchronized {
    public static void main(String[] args) {
        // 生产者和消费者共用同一个Car对象
        Car car = new Car();
        Producer pro = new Producer(car);
        Consumer con = new Consumer(car);
        //2个生产者 2个消费者
        Thread pro1 = new Thread(pro);
        Thread pro2 = new Thread(pro);
        Thread con1 = new Thread(con);
        Thread con2 = new Thread(con);
        pro1.start();
        pro2.start();
        con1.start();
        con2.start();
    }
}
// 汽车资源类
class Car{
    int cars;
    // 生产汽车的方法
    public synchronized void productCar(){
        try{
            if (cars<20){
                System.out.println("生产汽车:"+cars);
                cars++;
                notifyAll();
            }else {
                wait();
            }
        }catch (Exception e){}
    }
    // 消费汽车的方法
    public synchronized void consumeCar(){
        try{
            if (cars>0){
                System.out.println("销售汽车:"+cars);
                cars--;
                notifyAll();
            }else {
                wait();
            }
        }catch (Exception e){}
    }
}
//生产者
class Producer implements Runnable{
    Car car;
    public Producer(Car car) {
        this.car = car;
    }
    @Override
    public void run() {
        while (true){
            car.productCar();
        }
    }
}
// 消费者
class Consumer implements Runnable{
    Car car;
    public Consumer(Car car) {
        this.car = car;
    }
    @Override
    public void run() {
        while (true){
            car.consumeCar();
        }
    }
}

71fe722b32df4c6ea00ffd4bf4d926ea.png

4.2 Lock的ReentrantLock实现

class Cars{
    int cars;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    //生产车
    public void productCar(){
        lock.lock();
        try{
            if (cars<20){
                System.out.println("生产车:"+cars);
                cars++;
                condition.signalAll();
            }else {
                condition.await();
            }
        }catch (Exception e){
        }finally {
            lock.unlock();
        }
    }
    // 消费车
    public void consumeCar(){
        lock.lock();
        try{
            if (cars>0){
                System.out.println("消费车:"+cars);
                cars--;
                condition.signalAll();
            }else {
                condition.await();
            }
        }catch (Exception e){
        }finally {
            lock.unlock();
        }
    }
}


目录
相关文章
|
11天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
5月前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
98 0
|
3月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
26 1
|
3月前
|
Java C++
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
40 0
|
5月前
|
Java
【多线程面试题十六】、谈谈ReentrantLock的实现原理
这篇文章解释了`ReentrantLock`的实现原理,它基于Java中的`AbstractQueuedSynchronizer`(AQS)构建,通过重写AQS的`tryAcquire`和`tryRelease`方法来实现锁的获取与释放,并详细描述了AQS内部的同步队列和条件队列以及独占模式的工作原理。
【多线程面试题十六】、谈谈ReentrantLock的实现原理
|
3月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
37 0
|
5月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
5月前
|
Java 开发者
Java多线程教程:使用ReentrantLock实现高级锁功能
Java多线程教程:使用ReentrantLock实现高级锁功能
52 1
|
5月前
|
消息中间件 设计模式 安全
多线程魔法:揭秘一个JVM中如何同时运行多个消费者
【8月更文挑战第22天】在Java虚拟机(JVM)中探索多消费者模式,此模式解耦生产与消费过程,提升系统性能。通过`ExecutorService`和`BlockingQueue`构建含2个生产者及4个消费者的系统,实现实时消息处理。多消费者模式虽增强处理能力,但也引入线程安全与资源竞争等挑战,需谨慎设计以确保高效稳定运行。
105 2
|
5月前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。