【Semaphore、ReentrantLock、CountDownLatch、Cyclicbarrier、ReadWriteLock】多线程交替执行、顺序执行、同时执行、读写分离执行、生产者消费者

简介: 【Semaphore、ReentrantLock、CountDownLatch、Cyclicbarrier、ReadWriteLock】多线程交替执行、顺序执行、同时执行、读写分离执行、生产者消费者

一、交替执行


建立3个线程,完成交替执行,按照123123123…打印


1.1 使用Lock的ReentrantLock实现

public class ThreadPrint {
    public static void main(String[] args) {
        Print123 print123 = new Print123();
        new Thread(()->{
           while (true){
               print123.print1();
           }
        }).start();
        new Thread(()->{
            while (true){
                print123.print2();
            }
        }).start();
        new Thread(()->{
            while (true){
                print123.print3();
            }
        }).start();
    }
}
// 线程资源类
class Print123{
    private int number = 1;
    private Lock lock = new ReentrantLock();
    private Condition con1 = lock.newCondition();
    private Condition con2 = lock.newCondition();
    private Condition con3 = lock.newCondition();
    public void print1(){
        lock.lock();
        try{
            if (number!=1){
                con1.await();
            }
            System.out.println(1);
            Thread.sleep(1000);
            number = 2;
            con2.signal();
        }catch (Exception e){
            System.out.println("1发生异常!");
        }finally {
            lock.unlock();
        }
    }
    public void print2(){
        lock.lock();
        try{
            if (number!=2){
                con2.await();
            }
            System.out.println(2);
            Thread.sleep(1000);
            number = 3;
            con3.signal();
        }catch (Exception e){
            System.out.println("2发生异常!");
        }finally {
            lock.unlock();
        }
    }
    public void print3(){
        lock.lock();
        try{
            if (number!=3){
                con3.await();
            }
            System.out.println(3);
            Thread.sleep(1000);
            number = 1;
            con1.signal();
        }catch (Exception e){
            System.out.println("3发生异常!");
        }finally {
            lock.unlock();
        }
    }
}

72269659272543f2af995354f0b0f498.png


1.2 使用Semaphore交替执行1


每次个线程执行时,先获取唯一的许可,继而打印自己的value值;之后再将许可释放给下一个信号量,让下一个信号量打印value……

public class Print123WithSemaphore {
    public static void main(String[] args) {
        Print123With pring = new Print123With();
        new Thread(() -> pring.print1()).start();
        new Thread(() -> pring.pring2()).start();
        new Thread(() -> pring.print3()).start();
    }
}
class Print123With {
    //定义三个信号量,并且这三个信号量一共只有1个许可证
    private Semaphore semaphore1 = new Semaphore(1);
    private Semaphore semaphore2 = new Semaphore(0);
    private Semaphore semaphore3 = new Semaphore(0);
    public void print1() {
        print("1", semaphore1, semaphore2);
    }
    public void pring2() {
        print("2", semaphore2, semaphore3);
    }
    public void print3() {
        print("3", semaphore3, semaphore1);
    }
    /*
        value:打印的字符
        currentSemaphore:当前信号量
        nextSemaphore:下一个信号量
     */
    private void print(String value, Semaphore currentSemaphore, Semaphore nextSemaphore) {
        for (int i = 0; i < 10; ) {
            try {
                currentSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() +" print "+ value);
                Thread.sleep(1000);
                i++;
                nextSemaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

41f58c217d2940c3a23b565fc92f7ce5.png

1.3 Semaphore交替执行2

public class SemaphorePrint {
    private static Semaphore s1 = new Semaphore(1);
    private static Semaphore s2 = new Semaphore(0);
    private static Semaphore s3 = new Semaphore(0);
    public static void main(String[] args) {
        new Thread(()->{
            while (true){
                try{
                    s1.acquire();
                    System.out.println("A");
                    Thread.sleep(1000);
                }catch (Exception e){
                    System.out.println("异常");
                }finally {
                    s2.release();
                }
            }
        }).start();
        new Thread(()->{
            while (true){
                try{
                    s2.acquire();
                    System.out.println("B");
                    Thread.sleep(1000);
                }catch (Exception e){
                    System.out.println("异常");
                }finally {
                    s3.release();
                }
            }
        }).start();
        new Thread(()->{
            while (true){
                try{
                    s3.acquire();
                    System.out.println("C");
                    Thread.sleep(1000);
                }catch (Exception e){
                    System.out.println("异常");
                }finally {
                    s1.release();
                }
            }
        }).start();
    }
}

059dce04649045fc9134a26d01147f22.png


二、同时并发执行


2.1 Semaphore实现


默认所有的线程都是阻塞的,调用semaphore构造方法,设置线程的并发数,然后通过acquire()方法允许同一时间只能有三个线程来执行,三个线程执行完毕后,就可以调用release()来释放一次可执行的机会,然后从其他的线程里面随机挑选一个来执行。

public class SemaphoreWithBingfa {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        // 同一时间,只允许3个线程并发访问
        Semaphore semp = new Semaphore(3);
        // 创建10个线程
        for (int i = 0; i < 8; i++) {
            final int threadNo = i;
            //execute()方法的参数:重写了run()方法的Runnable对象
            executor.execute(() -> {
                        try {
                            //同一时间,只能有3个线程获取许可去执行
                            semp.acquire();
                            System.out.println("得到许可并执行的线程: " + threadNo);
                            Thread.sleep(1000);
                            // 得到许可的线程执行完毕后,将许可转让给其他线程
                            semp.release();
                        } catch (InterruptedException e) {
                        }
                    }
            );
        }
        //  executor.shutdown();
    }
}


d4e78d3fabbd41bea980ac5d1bfe9b36.png


2.2 CyclicBarrier实现状态同时执行


使用这个CyclicBarrier,可以实现A,B,C全部就绪后,await()表示就绪,然后三者开始同时执行。

例如:当三个人的会议,必须等到三个人到齐后,再能开始进行执行开始。


public class CyclicBarrierTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        //等待3个人全部到齐 才能开会
        CyclicBarrier barrier = new CyclicBarrier(3);
        for (int i = 0; i < 3; i++) {
            //设置人员的编号
            final int person = i;
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "用户" + person + "到达会议室了。");
                    barrier.await();
                    //3个人都到达  线程全部开始执行
                    System.out.println("三个人到齐," + Thread.currentThread().getName() + "开始开会干活!!!");
                } catch (Exception e) {
                }
            });
        }
        executor.shutdown();
    }
}

aa8317534fbe4ea9b0ce1e6573fbeb46.png

2.3 CountDownLatch 实现倒数计时【闭锁】


主要用于确保,某个计算,某个任务,某个服务线程,等到以来的所有线程执行完毕后,再去执行。

例如:等所有人走后,班长再走去关灯。

public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try{
                    System.out.println(Thread.currentThread().getName()+"正在执行...");
                }catch (Exception e){
                }finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        try{
            countDownLatch.await();
        }catch (Exception e){
        }
        System.out.println("主线程执行...");
    }
}

c1fac128cc03416ea2c7b07f1a8e3459.png

三、ReadWriteLock读写锁实现


为了更好地解决多线程读写带来的并发问题,JUC提供了ReadWriteLock,分别为读和写进行加锁的操作。

例如:

t1线程获取读锁,t2也可以获取读锁中的内容

如果t1线程获得了写锁,其他线程不能申请写锁或者读锁,必须等待t1释放锁资源。

这一点有些像mysql中的X锁与S锁,排他锁与共享锁。


3.1 ReentrantReadWriteLock 实现读写分离

public class ReadWriteLockTest {
    private static ReentrantReadWriteLock readwrite = new ReentrantReadWriteLock();
    public static void main(String[] args) {
        // 1.开启两个线程【同时进行读和写】
        new Thread(()->{
            myRead(Thread.currentThread());
            mtWrite(Thread.currentThread());
        },"t1").start();
        new Thread(()->{
            myRead(Thread.currentThread());
            mtWrite(Thread.currentThread());
        },"t2").start();
    }
    // 1.读锁来锁定操作
    static void myRead(Thread thread){
        readwrite.readLock().lock();
        try{
            for (int i = 0; i < 10000; i++) {
                System.out.println(thread.getName()+"正在进行读操作");
            }
            System.out.println(thread.getName()+"读操作完成");
        }finally {
            readwrite.readLock().unlock();
        }
    }
    // 2.写操作来进行锁定
    static void mtWrite(Thread thread){
        readwrite.writeLock().lock();
        try{
            for (int i = 0; i < 10000; i++) {
                System.out.println(thread.getName()+"正在进行写操作");
            }
            System.out.println(thread.getName()+"写操作完成");
        }finally {
            readwrite.writeLock().unlock();
        }
    }
}

2010863834d2409ca5469fe19f05297c.png

36ead7953ca54996bc4d26a2685691f6.png


3.2 CopyOnWrite 读写分离并发容器


同步容器 :Hashtable/Vector/Stack,早期设计中,并没有考虑并发的问题,因此在多出线程的情况下,是会出现ConcurrentModificationException异常的。

除了ConcurrnetHashMap,CopyOnWriteArrayList之外,JUC还提供了CopyOnWriteArraySet,ConcurrentLinkedQueue,PriorityBlockingQueue等并发容器类。

当向一个CopyOnWrite 容器中增加元素的时候,就会将容器复制一份。如:

1.容器复制一份,然后新的容器添加元素

2.增加元素后,在将引用指向新的容器,原容器被GC回收

CopyOnWrite 利用冗余实现了读写分离,适合用于【读多写少】的场景,复制比较消耗性能


四、生产者消费者


文中采用加锁的方式来实现生产者和消费者,如synchronized和Lock,

也可以【共享缓冲区】采用阻塞队列来实现BlockingQueue来实现,设置一个固定大小的缓冲区,达到最大值则无法生产,生产者进行阻塞,知道消费者消费后,小于阻塞队列的大小,则开始进行生产。


4.1 Synchronized来实现

//测试方法
public class ProducerWithConsumerWithSynchronized {
    public static void main(String[] args) {
        // 生产者和消费者共用同一个Car对象
        Car car = new Car();
        Producer pro = new Producer(car);
        Consumer con = new Consumer(car);
        //2个生产者 2个消费者
        Thread pro1 = new Thread(pro);
        Thread pro2 = new Thread(pro);
        Thread con1 = new Thread(con);
        Thread con2 = new Thread(con);
        pro1.start();
        pro2.start();
        con1.start();
        con2.start();
    }
}
// 汽车资源类
class Car{
    int cars;
    // 生产汽车的方法
    public synchronized void productCar(){
        try{
            if (cars<20){
                System.out.println("生产汽车:"+cars);
                cars++;
                notifyAll();
            }else {
                wait();
            }
        }catch (Exception e){}
    }
    // 消费汽车的方法
    public synchronized void consumeCar(){
        try{
            if (cars>0){
                System.out.println("销售汽车:"+cars);
                cars--;
                notifyAll();
            }else {
                wait();
            }
        }catch (Exception e){}
    }
}
//生产者
class Producer implements Runnable{
    Car car;
    public Producer(Car car) {
        this.car = car;
    }
    @Override
    public void run() {
        while (true){
            car.productCar();
        }
    }
}
// 消费者
class Consumer implements Runnable{
    Car car;
    public Consumer(Car car) {
        this.car = car;
    }
    @Override
    public void run() {
        while (true){
            car.consumeCar();
        }
    }
}

71fe722b32df4c6ea00ffd4bf4d926ea.png

4.2 Lock的ReentrantLock实现

class Cars{
    int cars;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    //生产车
    public void productCar(){
        lock.lock();
        try{
            if (cars<20){
                System.out.println("生产车:"+cars);
                cars++;
                condition.signalAll();
            }else {
                condition.await();
            }
        }catch (Exception e){
        }finally {
            lock.unlock();
        }
    }
    // 消费车
    public void consumeCar(){
        lock.lock();
        try{
            if (cars>0){
                System.out.println("消费车:"+cars);
                cars--;
                condition.signalAll();
            }else {
                condition.await();
            }
        }catch (Exception e){
        }finally {
            lock.unlock();
        }
    }
}


目录
相关文章
|
2月前
|
Python
如何在Python中使用Semaphore来实现线程同步?
如何在Python中使用Semaphore来实现线程同步?
27 7
|
4月前
|
Java 测试技术
CountDownLatch、CyclicBarrier让线程听我号令
CountDownLatch、CyclicBarrier让线程听我号令
42 0
|
4月前
|
数据处理
多线程与并发编程【线程对象锁、死锁及解决方案、线程并发协作、生产者与消费者模式】(四)-全面详解(学习总结---从入门到深化)
多线程与并发编程【线程对象锁、死锁及解决方案、线程并发协作、生产者与消费者模式】(四)-全面详解(学习总结---从入门到深化)
44 1
|
1天前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
|
2月前
|
存储 Java 数据库连接
线程通信(CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
线程通信(CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
34 0
|
3月前
|
消息中间件 安全 Java
多线程(初阶七:阻塞队列和生产者消费者模型)
多线程(初阶七:阻塞队列和生产者消费者模型)
31 0
|
4月前
|
前端开发 Java BI
自定义线程池+countdownlatch
自定义线程池+countdownlatch
22 0
|
4月前
|
Java C++
线程池-手写线程池C++11版本(生产者-消费者模型)
线程池-手写线程池C++11版本(生产者-消费者模型)
74 0
|
4月前
|
Java Linux C语言
线程池-手写线程池Linux C简单版本(生产者-消费者模型)
线程池-手写线程池Linux C简单版本(生产者-消费者模型)
44 0
|
5月前
|
SQL 供应链 安全
Linux多线程【生产者消费者模型】
Linux多线程【生产者消费者模型】
59 0