多线程必考的「生产者 - 消费者」模型,看齐姐这篇文章就够了

简介: 生产者 - 消费者模型 Producer-consumer problem 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。也是面试中无论中美大厂都非常爱考的一个问题,对应届生问的要少一些,但是对于有工作经验的工程师来说,非常爱考。这个问题有非常多的版本和解决方式,在本文我重点是和大家壹齐理清思路,由浅入深的思考问题,保证大家看完了都能有所收获。

问题背景


简单来说,这个模型是由两类线程构成:


  • 生产者线程:“生产”产品,并把产品放到一个队列里;


  • 消费者线程:“消费”产品。


image.png


有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。


所以该模型实现了生产者和消费者之间的解藕异步


什么是异步呢?


比如说你和你女朋友打电话,就得等她接了电话你们才能说话,这是同步。


但是如果你跟她发微信,并不需要等她回复,她也不需要立刻回复,而是等她有空了再回,这就是异步。


但是呢,生产者和消费者之间也不能完全没有联系的。


  • 如果队列里的产品已经满了,生产者就不能继续生产;


  • 如果队列里的产品从无到有,生产者就得通知一下消费者,告诉它可以来消费了;


  • 如果队列里已经没有产品了,消费者也无法继续消费;


  • 如果队列里的产品从满到不满,消费者也得去通知下生产者,说你可以来生产了。


所以它们之间还需要有协作,最经典的就是使用 Object 类里自带的 wait()notify() 或者 notifyAll() 的消息通知机制。


上述描述中的等着,其实就是用 wait() 来实现的;


通知,就是 notify() 或者 notifyAll()


那么基于这种消息通知机制,我们还能够平衡生产者和消费者之间的速度差异


如果生产者的生产速度很慢,但是消费者消费的很快,就像是我们每月工资就发两次,但是每天都要花钱,也就是 1:15.


那么我们就需要调整生产者(发工资)为 15 个线程,消费者保持 1 个线程,这样是不是很爽~


**总结下该模型的三大优点:


解藕,异步,平衡速度差异。**


wait()/notify()


接下来我们需要重点看下这个通知机制。


wait()notify() 都是 Java 中的 Object 类自带的方法,可以用来实现线程间的通信。


上一节讲的 11 个 APIs 里我也提到了它,我们这里再展开讲一下。


wait() 方法是用来让当前线程等待,直到有别的线程调用 notify() 将它唤醒,或者我们可以设定一个时间让它自动苏醒。


调用该方法之前,线程必须要获得该对象的对象监视器锁,也就是只能用在加锁的方法下。


而调用该方法之后,当前线程会释放锁。(提示:这里很重要,也是下文代码中用 while 而非 if 的原因。)


notify() 方法只能通知一个线程,如果多个线程在等待,那就唤醒任意一个。


notifyAll() 方法是可以唤醒所有等待线程,然后加入同步队列。


image.png


这里我们用到了 2 个队列:


  • 同步队列:对应于我们上一节讲的线程状态中的 Runnable,也就是线程准备就绪,就等着抢资源了。


  • 等待队列:对应于我们上一节讲的线程状态中的 Waiting,也就是等待状态。


这里需要注意,从等待状态线程无法直接进入 Q2,而是要先重新加入同步队列,再次等待拿锁,拿到了锁才能进去 Q2;一旦出了 Q2,锁就丢了。


Q2 里,其实只有一个线程,因为这里我们必须要加锁才能进行操作。


实现


这里我首先建了一个简单的 Product 类,用来表示生产和消费的产品,大家可以自行添加更多的 fields


public class Product  {
    private String name;
    public Product(String name) {
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}


主函数里我设定了两类线程,并且这里选择用普通的 ArrayDeque 来实现 Queue,更简单的方式是直接用 Java 中的 BlockingQueue 来实现。


BlockingQueue 是阻塞队列,它有一系列的方法可以让线程实现自动阻塞,常用的 BlockingQueue 有很多,后面会单独出一篇文章来讲。


这里为了更好的理解并发协同的这个过程,我们先自己处理。


public class Test {
    public static void main(String[] args) {
        Queue<Product> queue = new ArrayDeque<>();
        for (int i = 0; i < 100; i++) {
            new Thread(new Producer(queue, 100)).start();
            new Thread(new Consumer(queue, 100)).start();
        }
    }
}


然后就是 ProducerConsumer 了。


public class Producer implements Runnable{
    private Queue<Product> queue;
    private int maxCapacity;
    public Producer(Queue queue, int maxCapacity) {
        this.queue = queue;
        this.maxCapacity = maxCapacity;
    }
    @Override
    public void run() {
        synchronized (queue) {
            while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解释
                try {
                    System.out.println("生产者" + Thread.currentThread().getName() + "等待中... Queue 已达到最大容量,无法生产");
                    wait();
                    System.out.println("生产者" + Thread.currentThread().getName() + "退出等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (queue.size() == 0) { //队列里的产品从无到有,需要通知在等待的消费者
                queue.notifyAll();
            }
            Random random = new Random();
            Integer i = random.nextInt();
            queue.offer(new Product("产品"  + i.toString()));
            System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品:" + i.toString());
        }
    }
}


其实它的主逻辑很简单,我这里为了方便演示加了很多打印语句才显得有点复杂。


我们把主要逻辑拎出来看:


public void run() {
        synchronized (queue) {
            while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解释
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (queue.size() == 0) {
                queue.notifyAll();
            }
            queue.offer(new Product("产品"  + i.toString()));
        }
    }
}


这里有 3 块内容,再对照这个过程来看:


image.png


  1. 生产者线程拿到锁后,其实就是进入了 Q2 阶段。首先检查队列是否容量已满,如果满了,那就要去 Q3 等待;


  1. 如果不满,先检查一下队列原本是否为空,如果原来是空的,那就需要通知消费者;


  1. 最后生产产品。


这里有个问题,为什么只能用 while 而不是 if


其实在这一小段,生产者线程经历了几个过程:


  1. 如果队列已满,它就没法生产,那也不能占着位置不做事,所以要把锁让出来,去 Q3 - 等待队列 等着;


  1. 在等待队列里被唤醒之后,不能直接夺过锁来,而是要先加入 Q1 - 同步队列 等待资源;


  1. 一旦抢到资源,关门上锁,才能来到 Q2 继续执行 wait() 之后的活,但是,此时这个队列有可能又满了,所以退出 wait() 之后,还需要再次检查 queue.size() == maxCapacity 这个条件,所以要用 while


那么为什么可能又满了呢?


因为线程没有一直拿着锁,在被唤醒之后,到拿到锁之间的这段时间里,有可能其他的生产者线程先拿到了锁进行了生产,所以队列又经历了一个从不满到满的过程。


总结:在使用线程的等待通知机制时,一般都要在 while 循环中调用 wait() 方法。


消费者线程是完全对称的,我们来看代码。


public class Consumer implements Runnable{
    private Queue<Product> queue;
    private int maxCapacity;
    public Consumer(Queue queue, int maxCapacity) {
        this.queue = queue;
        this.maxCapacity = maxCapacity;
    }
    @Override
    public void run() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                try {
                    System.out.println("消费者" + Thread.currentThread().getName() + "等待中... Queue 已缺货,无法消费");
                    wait();
                    System.out.println("消费者" + Thread.currentThread().getName() + "退出等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (queue.size() == maxCapacity) {
                queue.notifyAll();
            }
            Product product = queue.poll();
            System.out.println("消费者" + Thread.currentThread().getName() + "消费了:" + product.getName());
        }
    }
}


结果如下:


image.png


小结


生产者 - 消费者问题是面试中经常会遇到的题目,本文首先讲了该模型的三大优点:解藕,异步,平衡速度差异,然后讲解了等待/通知的消息机制以及在该模型中的应用,最后进行了代码实现。


文中所有代码已经放到了我的 Github 上:

https://github.com/xiaoqi6666/NYCSDE


这个 Github 汇总了我所有的文章和资料,之后也会一直更新和维护,还希望大家帮忙点个 Star,你们的支持和认可,就是我创作的最大动力,我们下篇文章见!


我是小齐,纽约程序媛,终生学习者,每天晚上 9 点,云自习室里不见不散!

目录
相关文章
|
1天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
2月前
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
25 1
|
2月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
2月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
33 0
|
3天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
14 1
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
60 1
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
32 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
25 2
|
2月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
41 2