1、前言
学习JUC,就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型,用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中,生产者生产数据放入共享的缓冲区中,消费者从缓冲区中取出数据进行消费。在这个过程中,生产者和消费者之间需要保持同步,以避免数据出现错误或重复。今天我们就来说说生产者消费者模型,以及JUC中如何解决该模型的同步问题。
2、什么是生产者消费者问题
生产者消费者问题是一种经典的多线程问题,用于描述生产者和消费者之间的数据交换问题。其实本质上就是线程间通信问题,即线程等待唤醒和通知唤醒。
生产者消费者问题通常包含以下三个元素:
- 生产者:负责生产数据,并将其放入共享的缓冲区中。
- 消费者:负责从缓冲区中取出数据,并进行消费。
- 缓冲区:用于存放生产者生产的数据,消费者从中取出数据进行消费。
在实际应用中,生产者和消费者可能存在速度差异,导致缓冲区的数据量不断变化。如果缓冲区满了,生产者需要等待,直到消费者取走了一部分数据。同样,如果缓冲区为空,消费者需要等待,直到生产者生产了一些数据放入缓冲区中。
3、Synchronized解决方案
synchronized解决方案,一般采用wait()(等待唤醒)和notifyAll()(通知唤醒)进行线程的同步通信。
- wait()方法用于使当前线程等待,直到另一个线程调用相同对象上的notify()方法或notifyAll()方法来唤醒它。wait()方法必须在synchronized块或方法中调用,以确保线程获得对象的监视器锁。
- notify()方法用于通知等待在相同对象上的某个线程,告诉它们可以继续运行。
- notifyAll()方法则通知等待在相同对象上的所有线程。
调用wait()方法会释放锁,使当前线程进入等待状态,直到其他线程调用相同对象上的notify()方法或notifyAll()方法唤醒它。而notify()方法则会随机选择一个等待的线程唤醒,而notifyAll()则会唤醒所有等待的线程,让它们竞争锁。
public class ProducerConsumerExample { public static void main(String[] args) { NumberOper object = new NumberOper(); new Thread(() -> { for (int i = 0; i < 10; i++) { object.add(); } }, "thread-add-1").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { object.sub(); } }, "thread-sub-1").start(); } } class NumberOper { private int number = 0; public synchronized void add() { if(number != 0) { try { // 等待唤醒 this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number++; System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number); // 通知其他唤醒 this.notifyAll(); } public synchronized void sub() { if(number == 0) { try { // 等待唤醒 this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number--; System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number); // 通知其他唤醒 this.notifyAll(); } }
执行结果:
不过需要注意的是,上面的代码没有考虑到多线程并发的情况,如果多个生产者和多个消费者同时访问缓冲区,就需要使用线程安全的数据结构或加锁来保证线程安全。也就是虚假唤醒问题。
虚假唤醒问题,请参考《wait(),notify()虚假唤醒》篇幅。
4、Lock解决方案
Synchronized解决方案,主要是依赖于wait()和notify()方法解决。相应的JUC中的Lock也是类似的解决手段。
- Synchronized:(注意wait()和notify()方法是Object的方法)
- wait():线程等待,直到其他线程将他唤醒
- notify():唤醒其他等待的线程
- notifyAll():换新所有等待的线程
- Lock:
- await():线程等待,直到其他线程将他唤醒
- signal():唤醒正在等待的线程
- signalAll():唤醒正在等待的线程
使用JUC Lock来解决生产者消费者问题,可以使用Condition(条件变量)来实现。
Condition是基于Lock来创建的,每个Condition对象都和一个Lock对象绑定。Condition对象提供了类似wait()和notify()的方法来控制线程的等待和唤醒。Condition对象可以通过Lock对象的newCondition()方法创建。
生产者消费者问题中,我们可以使用两个Condition对象来控制生产者和消费者的等待和唤醒。当缓冲区为空时,消费者线程等待,当缓冲区满时,生产者线程等待。
package com.github.fastdev.waitnotify; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author Shamee loop * @date 2023/4/9 */ public class ProducerConsumerExample { public static void main(String[] args) { NumberOper object = new NumberOper(); new Thread(() -> { for (int i = 0; i < 10; i++) { object.add(); } }, "thread-add-1").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { object.sub(); } }, "thread-sub-1").start(); } } class NumberOper { private int number = 0; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void add() { lock.lock(); try { if (number != 0) { condition.await(); } number++; System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void sub() { lock.lock(); try { if (number == 0) { condition.await(); } number--; System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
执行结果:
5、Condition
那么既然synchronized就能解决生产者消费者问题,为什么还需要JUC的Lock这种方式呢? 从代码量上看,Lock的方式明显比较繁琐。
当然,存在即合理。JUC实现了Lock的方式,且引入了Condition。肯定是具备了synchronized所没有的特性。
试想一个场景:
synchronized的notify()虽然唤醒了等待的线程。但是如果存在多个等待的线程呢?唤醒后获得执行权的需要取决于分配策略。那么有没有一种可能,我需要指定唤醒某个等待的线程?Condition就来了,他可以指定唤醒某个线程,也就是精准唤醒。
Condition 是 Java 中 Lock 的一个重要组件,可以用于实现更加灵活、高效的线程同步。它提供了类似于 Object.wait() 和 Object.notify() 的等待/通知机制,但相较于传统的 synchronized,它更加灵活,可以实现更多高级特性。
Condition 的主要作用是允许线程在等待某些条件的情况下暂停执行(即阻塞线程),并且当条件满足时,可以重新唤醒这些线程。Condition 与 Lock 一起使用,通常需要创建一个 Lock 对象,然后调用 Lock 的 newCondition() 方法来创建一个 Condition 对象。
Condition 接口中最常用的方法包括:
- await():当前线程等待,直到被通知或中断;
- awaitUninterruptibly():当前线程等待,直到被通知,但不会响应中断;
- signal():唤醒一个等待中的线程;
- signalAll():唤醒所有等待中的线程。
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumerExample { private final Queue<Integer> queue = new LinkedList<>(); private final int capacity = 10; private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); public void produce() throws InterruptedException { lock.lock(); try { while (queue.size() == capacity) { notFull.await(); } int num = (int) (Math.random() * 100); queue.add(num); System.out.println("Produced " + num); // 指定唤醒线程 notEmpty.signal(); } finally { lock.unlock(); } } public void consume() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { notEmpty.await(); } int num = queue.remove(); System.out.println("Consumed " + num); // 指定唤醒线程 notFull.signal(); } finally { lock.unlock(); } } public static void main(String[] args) { ProducerConsumerExample example = new ProducerConsumerExample(); Thread producerThread1 = new Thread(() -> { while (true) { try { example.produce(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread producerThread2 = new Thread(() -> { while (true) { try { example.produce(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumerThread1 = new Thread(() -> { while (true) { try { example.consume(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumerThread2 = new Thread(() -> { while (true) { try { example.consume(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); producerThread1.start(); producerThread2.start(); consumerThread1.start(); consumerThread2.start(); } }
6、小结
到此,我们学习了生产者和消费者模型,以及他的一些问题,以及如何解决。还接触了Locks中的另一个类Condition的使用。一天进步一点点,一起加油~