【JUC基础】06. 生产者和消费者问题

简介: 学习JUC,就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型,用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中,生产者生产数据放入共享的缓冲区中,消费者从缓冲区中取出数据进行消费。在这个过程中,生产者和消费者之间需要保持同步,以避免数据出现错误或重复。今天我们就来说说生产者消费者模型,以及JUC中如何解决该模型的同步问题。

 1、前言

学习JUC,就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型,用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中,生产者生产数据放入共享的缓冲区中,消费者从缓冲区中取出数据进行消费。在这个过程中,生产者和消费者之间需要保持同步,以避免数据出现错误或重复。今天我们就来说说生产者消费者模型,以及JUC中如何解决该模型的同步问题。

2、什么是生产者消费者问题

生产者消费者问题是一种经典的多线程问题,用于描述生产者和消费者之间的数据交换问题。其实本质上就是线程间通信问题,即线程等待唤醒和通知唤醒。

生产者消费者问题通常包含以下三个元素:

    1. 生产者:负责生产数据,并将其放入共享的缓冲区中。
    2. 消费者:负责从缓冲区中取出数据,并进行消费。
    3. 缓冲区:用于存放生产者生产的数据,消费者从中取出数据进行消费。

    在实际应用中,生产者和消费者可能存在速度差异,导致缓冲区的数据量不断变化。如果缓冲区满了,生产者需要等待,直到消费者取走了一部分数据。同样,如果缓冲区为空,消费者需要等待,直到生产者生产了一些数据放入缓冲区中。

    3、Synchronized解决方案

    synchronized解决方案,一般采用wait()(等待唤醒)和notifyAll()(通知唤醒)进行线程的同步通信。

      • wait()方法用于使当前线程等待,直到另一个线程调用相同对象上的notify()方法或notifyAll()方法来唤醒它。wait()方法必须在synchronized块或方法中调用,以确保线程获得对象的监视器锁。
      • notify()方法用于通知等待在相同对象上的某个线程,告诉它们可以继续运行。
      • notifyAll()方法则通知等待在相同对象上的所有线程。

      调用wait()方法会释放锁,使当前线程进入等待状态,直到其他线程调用相同对象上的notify()方法或notifyAll()方法唤醒它。而notify()方法则会随机选择一个等待的线程唤醒,而notifyAll()则会唤醒所有等待的线程,让它们竞争锁。

      public class ProducerConsumerExample {
          public static void main(String[] args) {
              NumberOper object = new NumberOper();
              new Thread(() -> {
                  for (int i = 0; i < 10; i++) {
                      object.add();
                  }
              }, "thread-add-1").start();
              new Thread(() -> {
                  for (int i = 0; i < 10; i++) {
                      object.sub();
                  }
              }, "thread-sub-1").start();
          }
      }
      class NumberOper {
          private int number = 0;
          public synchronized void add() {
              if(number != 0) {
                  try {
                      // 等待唤醒
                      this.wait();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              number++;
              System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number);
              // 通知其他唤醒
              this.notifyAll();
          }
          public synchronized void sub() {
              if(number == 0) {
                  try {
                      // 等待唤醒
                      this.wait();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              number--;
              System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number);
              // 通知其他唤醒
              this.notifyAll();
          }
      }

      image.gif

      执行结果:

      image.png

      不过需要注意的是,上面的代码没有考虑到多线程并发的情况,如果多个生产者和多个消费者同时访问缓冲区,就需要使用线程安全的数据结构或加锁来保证线程安全。也就是虚假唤醒问题。

      image.png

      虚假唤醒问题,请参考《wait(),notify()虚假唤醒》篇幅。

      4、Lock解决方案

      Synchronized解决方案,主要是依赖于wait()和notify()方法解决。相应的JUC中的Lock也是类似的解决手段。

        • Synchronized:(注意wait()和notify()方法是Object的方法)
          • wait():线程等待,直到其他线程将他唤醒
          • notify():唤醒其他等待的线程
          • notifyAll():换新所有等待的线程
            • Lock:
              • await():线程等待,直到其他线程将他唤醒
              • signal():唤醒正在等待的线程
              • signalAll():唤醒正在等待的线程

                使用JUC Lock来解决生产者消费者问题,可以使用Condition(条件变量)来实现。

                Condition是基于Lock来创建的,每个Condition对象都和一个Lock对象绑定。Condition对象提供了类似wait()和notify()的方法来控制线程的等待和唤醒。Condition对象可以通过Lock对象的newCondition()方法创建。

                生产者消费者问题中,我们可以使用两个Condition对象来控制生产者和消费者的等待和唤醒。当缓冲区为空时,消费者线程等待,当缓冲区满时,生产者线程等待。

                image.png

                 

                package com.github.fastdev.waitnotify;
                import java.util.concurrent.locks.Condition;
                import java.util.concurrent.locks.Lock;
                import java.util.concurrent.locks.ReentrantLock;
                /** * @author Shamee loop * @date 2023/4/9 */
                public class ProducerConsumerExample {
                    public static void main(String[] args) {
                        NumberOper object = new NumberOper();
                        new Thread(() -> {
                            for (int i = 0; i < 10; i++) {
                                object.add();
                            }
                        }, "thread-add-1").start();
                        new Thread(() -> {
                            for (int i = 0; i < 10; i++) {
                                object.sub();
                            }
                        }, "thread-sub-1").start();
                    }
                }
                class NumberOper {
                    private int number = 0;
                    Lock lock = new ReentrantLock();
                    Condition condition = lock.newCondition();
                    public void add() {
                        lock.lock();
                        try {
                            if (number != 0) {
                                condition.await();
                            }
                            number++;
                            System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number);
                            condition.signalAll();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                    public void sub() {
                        lock.lock();
                        try {
                            if (number == 0) {
                                condition.await();
                            }
                            number--;
                            System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number);
                            condition.signalAll();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }

                image.gif

                执行结果:

                image.png

                5、Condition

                那么既然synchronized就能解决生产者消费者问题,为什么还需要JUC的Lock这种方式呢? 从代码量上看,Lock的方式明显比较繁琐。

                当然,存在即合理。JUC实现了Lock的方式,且引入了Condition。肯定是具备了synchronized所没有的特性。

                试想一个场景:

                synchronized的notify()虽然唤醒了等待的线程。但是如果存在多个等待的线程呢?唤醒后获得执行权的需要取决于分配策略。那么有没有一种可能,我需要指定唤醒某个等待的线程?Condition就来了,他可以指定唤醒某个线程,也就是精准唤醒。

                Condition 是 Java 中 Lock 的一个重要组件,可以用于实现更加灵活、高效的线程同步。它提供了类似于 Object.wait() 和 Object.notify() 的等待/通知机制,但相较于传统的 synchronized,它更加灵活,可以实现更多高级特性。

                Condition 的主要作用是允许线程在等待某些条件的情况下暂停执行(即阻塞线程),并且当条件满足时,可以重新唤醒这些线程。Condition 与 Lock 一起使用,通常需要创建一个 Lock 对象,然后调用 Lock 的 newCondition() 方法来创建一个 Condition 对象。

                Condition 接口中最常用的方法包括:

                  • await():当前线程等待,直到被通知或中断;
                  • awaitUninterruptibly():当前线程等待,直到被通知,但不会响应中断;
                  • signal():唤醒一个等待中的线程;
                  • signalAll():唤醒所有等待中的线程。
                  import java.util.LinkedList;
                  import java.util.Queue;
                  import java.util.concurrent.locks.Condition;
                  import java.util.concurrent.locks.Lock;
                  import java.util.concurrent.locks.ReentrantLock;
                  public class ProducerConsumerExample {
                      private final Queue<Integer> queue = new LinkedList<>();
                      private final int capacity = 10;
                      private final Lock lock = new ReentrantLock();
                      private final Condition notEmpty = lock.newCondition();
                      private final Condition notFull = lock.newCondition();
                      public void produce() throws InterruptedException {
                          lock.lock();
                          try {
                              while (queue.size() == capacity) {
                                  notFull.await();
                              }
                              int num = (int) (Math.random() * 100);
                              queue.add(num);
                              System.out.println("Produced " + num);
                               // 指定唤醒线程
                              notEmpty.signal();
                          } finally {
                              lock.unlock();
                          }
                      }
                      public void consume() throws InterruptedException {
                          lock.lock();
                          try {
                              while (queue.isEmpty()) {
                                  notEmpty.await();
                              }
                              int num = queue.remove();
                              System.out.println("Consumed " + num);
                              // 指定唤醒线程
                              notFull.signal();
                          } finally {
                              lock.unlock();
                          }
                      }
                      public static void main(String[] args) {
                          ProducerConsumerExample example = new ProducerConsumerExample();
                          Thread producerThread1 = new Thread(() -> {
                              while (true) {
                                  try {
                                      example.produce();
                                      Thread.sleep(1000);
                                  } catch (InterruptedException e) {
                                      e.printStackTrace();
                                  }
                              }
                          });
                          Thread producerThread2 = new Thread(() -> {
                              while (true) {
                                  try {
                                      example.produce();
                                      Thread.sleep(1000);
                                  } catch (InterruptedException e) {
                                      e.printStackTrace();
                                  }
                              }
                          });
                          Thread consumerThread1 = new Thread(() -> {
                              while (true) {
                                  try {
                                      example.consume();
                                      Thread.sleep(1000);
                                  } catch (InterruptedException e) {
                                      e.printStackTrace();
                                  }
                              }
                          });
                          Thread consumerThread2 = new Thread(() -> {
                              while (true) {
                                  try {
                                      example.consume();
                                      Thread.sleep(1000);
                                  } catch (InterruptedException e) {
                                      e.printStackTrace();
                                  }
                              }
                          });
                          producerThread1.start();
                          producerThread2.start();
                          consumerThread1.start();
                          consumerThread2.start();
                      }
                  }

                  image.gif

                  6、小结

                  到此,我们学习了生产者和消费者模型,以及他的一些问题,以及如何解决。还接触了Locks中的另一个类Condition的使用。一天进步一点点,一起加油~

                  相关文章
                  |
                  容器
                  多线程学习之生产者和消费者与阻塞队列的关系
                  多线程学习之生产者和消费者与阻塞队列的关系
                  63 0
                  |
                  消息中间件 Java 测试技术
                  Java多线程消费消息
                  关键词:Java,多线程,消息队列,rocketmq 多线程一个用例之一就是消息的快速消费,比如我们有一个消息队列我们希望以更快的速度消费消息,假如我们用的是rocketmq,我们从中获取消息,然后使用多线程处理。
                  131 0
                  |
                  5月前
                  |
                  Java
                  并发编程之生产者和消费者问题
                  该博客文章通过Java代码示例介绍了生产者和消费者问题的线程间通信解决方案,演示了如何使用synchronized关键字和wait/notifyAll方法来实现线程间的同步和资源的协调访问。
                  |
                  5月前
                  |
                  安全
                  LinkedBlockingQueue实现的生产者和消费者模型
                  LinkedBlockingQueue实现的生产者和消费者模型
                  44 1
                  |
                  7月前
                  |
                  并行计算 安全 Go
                  可重入锁实现消费者和生产者的例子
                  【6月更文挑战第28天】本文探讨了Python和Go中使用可重入锁(RLock)进行线程同步以及异步操作。异步存取示例展示了goroutine的并发优势,启动简单且运行异步。goroutine的调度和并发处理能力是其高效并发的关键。
                  42 0
                  可重入锁实现消费者和生产者的例子
                  |
                  8月前
                  |
                  Java
                  用java实现生产者和消费者模式
                  用java实现生产者和消费者模式
                  64 1
                  生产者消费者问题(生产者和消费者分别阻塞于不同的锁)
                  生产者消费者问题(生产者和消费者分别阻塞于不同的锁)
                  |
                  安全 数据处理
                  线程中的生产者和消费者模式
                  线程中的生产者和消费者模式
                  132 0
                  线程中的生产者和消费者模式
                  |
                  缓存 Java 数据安全/隐私保护
                  JUC并发编程学习(四)-生产者与消费者
                  JUC并发编程学习(四)-生产者与消费者
                  JUC并发编程学习(四)-生产者与消费者
                  |
                  安全 Java
                  Java多线程——生产者/消费者问题
                  生产者/消费者问题
                  171 0