一线大厂面试真题,模拟消费者-生产者场景。
同样今天的分享,我们不纸上谈兵,也不空谈八股文。以实际面经、工作实战经验进行开题,然后再剖析核心源码原理。
按常见面经要求,生产者生产完指定数量产品后,才能消费。消费者消费完这批产品后,生产者才能继续生产。
我们利用Condition可以协调线程之间的通知执行和阻塞等待特性,进行实现经典的【生产者-消费者】场景。代码有详细描述,
用最少代码演示,尽力让大家看一遍就能懂、能复用。
package lading.java.mutithread; import cn.hutool.core.date.DateTime; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 模拟生产者-消费者,生产者生产了产品后消费者才能消费,消费者消费完产品后才能继续生产产品 * 每次随机最多生产4件产品,消费者消费完才能继续生产 */ public class Demo011Condition { //可消费的产品数量 public static AtomicInteger productNum = new AtomicInteger(0); //可重入锁 public static ReentrantLock lock = new ReentrantLock(); //消费者信号 public static Condition consumer = lock.newCondition(); //生产者信号 public static Condition producer = lock.newCondition(); public static void main(String[] args) { new Thread(() -> { while (true) { lock.lock(); try { //如果有产品可以消费 while (productNum.get() > 0) { //生产者线程阻塞等待,并释放锁 producer.await(); } //随机生产最多4件产品,耗时3s Thread.sleep(3000); int total = new Random().nextInt(4) + 1; for (int i = 0; i < total; i++) { System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") + " 【" + Thread.currentThread().getName() + "】新生产了产品:" + productNum.incrementAndGet()); } //通知消费者来消费 System.out.println("生产者 --> 已生产" + total + "件产品,可以过来消费。"); consumer.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "生产者").start(); new Thread(() -> { while (true) { lock.lock(); try { //没有可以消费,消费者进入阻塞等待,释放锁 while (productNum.get() == 0) { consumer.await(); } //消费者消费产品,耗时2s Thread.sleep(2000); int total = productNum.get(); while (productNum.get() != 0) { System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") + " 【" + Thread.currentThread().getName() + "】消费了产品:" + productNum.getAndDecrement()); } //唤醒生产者继续生产 System.out.println("消费者 --> " + total + "件产品已清空,请继续生产。"); producer.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "消费者").start(); } }
结果,消费者和生产者交叉运行。
生产好了产品,通知消费者消费;
消费者消费完产品,通知生产者继续生产产品。
1、Condition是什么?
Condition很简单,它只是JUC包里的一个接口。定义了2个核心方法。
一个是await()方法,可以让线程进入阻塞等待。
另一个是signal()方法,可以唤醒指定线程。
1.1 具体看看Condition源码
在AQS的内部类ConditionObject对这个接口进行了实现。我们看ConditionObject,他就是AQS里经典的FIFO条件等待队列。
ConditionObject的类结构图。
看了源码结构,和刚才总结一样,Condition有2大核心方法:
其中,await(),还有一个阻塞等待时间入参可选await(long time, TimeUnit unit):让当前线程进入阻塞,等待该条件唤醒,并设置等待超时时间;
然后signal()是唤醒等待该Condition的一个线程;而signalAll()是唤醒所有等待该Condition条件的线程。
2、Condition的条件等待唤醒和Object.wait()、notify()有什么区别
问到正主了,其实这个问题也就是问:Condition的优点有哪些。最开始我们学JAVA编程,都是从synchronized开始,线程并发协调,常用的就是对象的wait(),nofity()方法。
说到Condition的条件等待和唤醒,与JAVA对象的等待唤醒机制区别,就应该先说说ReentrantLock和synchronized的区别。这个区别在之前的文章《ReentrantLock核心原理剖析》有过分享。这里再补充几个。其中之一,就是ReentrantLock通过Condition条件队列实现多路通知,而synchronized只能单路通知。具体就是ReentrantLock可以通过多个不同的condition条件队列进行等待和唤醒,synchronized相当于功能单一的锁。
另外synchronized的锁是通过JVM实现,而ReentrantLock是通过接口api实现。
那Condition条件队列和对象的等待阻塞最大不同是什么?
一句话:Condition支持按顺序精准唤醒线程。而Object的做不到。
比如要求实现A和B执行完成后,C才能执行,而且C执行完成后才能执行D、F。任意指定协调条件,Condition都可以支持。
篇幅有限,这个精准唤醒多条件协调案例demo先不放上来了。
3、说说Condition条件队列核心原理
那我们就说说await()方法是如何实现的,一句话,核心就是:调用await()方法后,当前线程进入阻塞,同时释放锁。
具体我们看看源码。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //将自己加入到AQS的FIFO等待队列尾部,进入等待 Node node = addConditionWaiter(); //释放锁资源 int savedState = fullyRelease(node); int interruptMode = 0; //检测本线程节点是否在AQS同步队列中,如果不在就继续检测,直到发现在AQS同步队列中 while (!isOnSyncQueue(node)) { //如果在队列中,就挂起本线程 LockSupport.park(this); //如果被中断,则跳出循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //尝试获取锁-毕竟这期间有可能被唤醒了,以及是否发生过中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
具体await()方法里,就是线程进入了阻塞状态,以及自旋,不断判断自己NODE节点是在等待队列,还是在AQS队列。如果在AQS队列,就说明已经被唤醒了,不用再继续阻塞。如果还在等待队列,就继续自旋。
最后,我们再看看signal()唤醒方法。唤醒该条件队列线程。如果ReentrantLock是公平锁,就唤醒等待时间最长的头节点线程,如果是非公平锁,就唤醒整个等待队列。
唤醒操作,其实就是从等待队列中,将头节点挪到AQS节点,这样这个节点就被唤醒,不用再阻塞。如果是signalAll(),就是将全部等待队列节点,挪到AQS,唤醒他们。
public final void signal() { //判断当前线程是否持有锁,只有持有锁才能唤醒其他线程,不持有锁,就抛异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } //具体唤醒过程 private void doSignal(Node first) { //尝试将头节点NODE唤醒,从等待队列中,挪到AQS队列,并通过CAS操作更新NODE状态 do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
今天就分享这么多,目前已分享synchronized、volatile、AQS、CAS、ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier、Condition,并发编程里的基础中的基础已经分享完了.
下次我们分享线程池、并发容器、ThreadLocal、Future、FutureTask。