JUC并发编程学习(四)-生产者与消费者

简介: JUC并发编程学习(四)-生产者与消费者

生产者和消费者

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-bufferproblem),是一个多线程同步问题的经典案例。

简单说明,就是一个类用于生产,一个类用于消费。生产者生产一定的数据放入管存区,然后重复此操作,同时,消费者消费缓存区的数据。生产者和消费者之间必须保持同步 。要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

解决方法(思路)

采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。


在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。


解决问题的核心

  保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java能实现的几种方法

1.wait() / notify()方法—传统

2.await() / signal()方法—新版

3.BlockingQueue阻塞队列方法

4.信号量

5.管道

接下来重点介绍,1,2方法

生产者和消费者 synchroinzed 版 (wait/notify()方法)

题目:现在两个线程,操作一个初始值为0的变量 一个线程 + 1, 一个线程 -1。判断什么时候+1,什么时候-1

交替10 次

package com.juc.study.lockdemo;
/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知
* @Version: 1.0
*/
public class A {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();
    }
}
//属性、方法
class Data {
    private int num = 0;
    //+1操作
    public synchronized void increment() {
        //判断
        if (num > 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num++;
        System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
        //通知
        this.notify();
    }
    //-1 操作
    public synchronized void decrement() {
        //判断
        if (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num--;
        System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
        this.notify();
    }
}

控制台输出:

20200401134307494.png

使用了 synchronized关键字,当前只有2个操作是没存在什么问题。再加多加几个线程试试。

问题升级:防止虚假唤醒,4个线程,两个加,两个减

package com.juc.study.lockdemo;
/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知
* @Version: 1.0
*/
public class A {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();
    }
}
//属性、方法
class Data {
    private int num = 0;
    //+1操作
    public synchronized void increment() {
        //判断
        if (num > 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num++;
        System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
        //通知
        this.notify();
    }
    //-1 操作
    public synchronized void decrement() {
        //判断
        if (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num--;
        System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
        this.notify();
    }
}

然后发现出现了下面这样的问题。这是虚假唤醒的问题。

20200401134307494.png

虚假唤醒:FalseNotify,线程竞争监视器导致的虚假唤醒。

虚假唤醒原因:A线程由wait被notify后,状态由WAITING变成BLOCKED状态,来竞争监视器,但是另外一个线程B也处于BLOCKED状态,它也会来竞争监视器,这是,CPU是没办法控制到底是谁先拿到监视器。

如果不是wait的A线程先拿到监视器,那当wait的A线程拿到监视器的时候,共享的值已经改变了。查看了一下JDK文档,有做这样的说明

20200401134307494.png

原因是我们写的判断语句使用的是if,这样的写法引发了虚假唤醒的问题。改为while就没有这样的问题。

package com.juc.study.lockdemo;
/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知
* @Version: 1.0
*/
public class A {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();
    }
}
//属性、方法
class Data {
    private int num = 0;
    //+1操作
    public synchronized void increment() {
        //判断
       while (num > 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num++;
        System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
        //通知
        this.notify();
    }
    //-1 操作
    public synchronized void decrement() {
        //判断
       while (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num--;
        System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
        this.notify();
    }
}

生产者和消费者-新版JUC写法

查看jdk1.8文档,可看到 使用了Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。

20200401134307494.png

20200401134307494.png

手写生产者消费者问题:100 分写法

package com.juc.study.lockdemo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知  //新版写法 
* @Version: 1.0
*/
public class B {
    public static void main(String[] args) {
        Data2 data = new Data2();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();
    }
}
//属性、方法
class Data2 {
    private int num = 0;
    //定义锁
    private Lock lock=new ReentrantLock();
    private Condition condition=lock.newCondition();
    //+1操作
    public  void increment() {
        //加锁
        lock.lock();
        try {
            //判断
            while (num > 0) {
                 condition.await(); //等待
             }
            //干活
            num++;
            System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
            condition.signalAll();//通知
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            lock.unlock();
        }
    }
    //-1 操作
    public  void decrement() {
        //加锁
        lock.lock();
        try {
            //判断
            while (num == 0) {
                condition.await();//等待
             }
            //干活
            num--;
            System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
            condition.signalAll();//通知
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//    解锁
        }
    }
}

以上4个进程,唤醒是随机的,即进程的执行时随机的,不是有序进行的,那怎么能让进程有序执行呢。接下来我们讨论进程间如何精确通知访问。

控制线程精确通知顺序访问 Condition

Lock提供了一个接口Condition,通过Lock类对象获取Condition实现类对象。通过Condition,可以指定唤醒哪个进程。


condition最大的特点是可以实现精确顺序通知线程的使用.

一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其newCondition()方法。


例如,假设我们有一个有限的缓冲区,它支持put和take方法。 如果在一个空的缓冲区尝试一个take ,则线程将阻塞直到一个项目可用; 如果put试图在一个完整的缓冲区,那么线程将阻塞,直到空间变得可用。 我们希望在单独的等待集中等待put线程和take线程,以便我们可以在缓冲区中的项目或空间可用的时候使用仅通知单个线程的优化。 这可以使用两个Condition实例来实现。

  class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 
   final Object[] items = new Object[100];
   int putptr, takeptr, count;
   public void put(Object x) throws InterruptedException {
     lock.lock(); try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally { lock.unlock(); }
   }
   public Object take() throws InterruptedException {
     lock.lock(); try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally { lock.unlock(); }
   }
 } 

实例

使用num进行数字变换,1,2,3分别代表A,B,C。 当num=1时,A打印3次,打印完通知B线程打印10次,B线程打印结束通知C线程。

package com.juc.study.lockdemo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-15 14:19
* @description:    TODO 精确线程调用A->B->C->A
* @Version: 1.0
*/
public class C {
    public static void main(String[] args) {
        Data3 da = new Data3();
        new Thread(() -> {
            for (int i = 0; i <10 ; i++) {
                da.print3();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i <10 ; i++) {
                da.print10();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i <10 ; i++) {
                da.print5();
            }
        }, "C").start();
    }
}
class Data3 {
    private int num = 1;  //1:A  2:B 3:C ,1表示A,2表示B...
    //锁
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition(); 3个判断,交替执行 A--B-- C--A
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();
    /**
     * A :打印3次后,通知B
     */
    public void print3() {
        lock.lock();
        //获取锁
        try {
            while (num != 1) {
                conditionA.await();
            }
            //干活
            for (int i = 0; i < 3; i++) {
                System.out.println(Thread.currentThread().getName() + "\t打印次数:" + i);
            }
            num = 2;
            //第一个线程通知第2个线程,第2个通知第3个。。
            conditionB.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//解锁
        }
    }
    /**
     * B :打印10次后,通知C
     */
    public void print10() {
        //获取锁
        lock.lock();
        try {
            while (num != 2) {
                conditionB.await();//等待
            }
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + "\t打印次数:" + i);
            }
            num = 3;
            conditionC.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    /**
     * C:打印5次后,通知A
     */
    public void print5() {
        lock.lock();
        try {
            while (num != 3) {
                conditionC.await();
            }
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + "\t打印次数:" + i);
            }
            num = 1;
            conditionA.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

打印结果:

20200401134307494.png

其他实现方法参考:

https://blog.csdn.net/zgj12138/article/details/74012263?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1


有兴趣的老爷,可以关注我的公众号【一起收破烂】,回复【006】获取2021最新java面试资料以及简历模型120套哦~


相关文章
|
容器
多线程学习之生产者和消费者与阻塞队列的关系
多线程学习之生产者和消费者与阻塞队列的关系
48 0
|
消息中间件 Java 测试技术
Java多线程消费消息
关键词:Java,多线程,消息队列,rocketmq 多线程一个用例之一就是消息的快速消费,比如我们有一个消息队列我们希望以更快的速度消费消息,假如我们用的是rocketmq,我们从中获取消息,然后使用多线程处理。
115 0
|
1月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
23 0
|
3月前
|
Java
并发编程之生产者和消费者问题
该博客文章通过Java代码示例介绍了生产者和消费者问题的线程间通信解决方案,演示了如何使用synchronized关键字和wait/notifyAll方法来实现线程间的同步和资源的协调访问。
|
安全 Java
【JUC基础】06. 生产者和消费者问题
学习JUC,就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型,用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中,生产者生产数据放入共享的缓冲区中,消费者从缓冲区中取出数据进行消费。在这个过程中,生产者和消费者之间需要保持同步,以避免数据出现错误或重复。今天我们就来说说生产者消费者模型,以及JUC中如何解决该模型的同步问题。
147 0
|
存储 Java
Java基础进阶多线程-生产者和消费者模式
Java基础进阶多线程-生产者和消费者模式
Java基础进阶多线程-生产者和消费者模式
|
存储 缓存 Java
JUC并发编程学习(十)-阻塞队列、同步队列
JUC并发编程学习(十)-阻塞队列、同步队列
JUC并发编程学习(十)-阻塞队列、同步队列
|
Java
Java 线程 案例:生产者与消费者
Java 线程 案例:生产者与消费者
164 0
Java 线程 案例:生产者与消费者
|
安全 Java
Java多线程——生产者/消费者问题
生产者/消费者问题
162 0