JAVA并发编程系列(10)Condition条件队列-并发协作者

本文涉及的产品
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。

一线大厂面试真题,模拟消费者-生产者场景。

   同样今天的分享,我们不纸上谈兵,也不空谈八股文。以实际面经、工作实战经验进行开题,然后再剖析核心源码原理。

   按常见面经要求,生产者生产完指定数量产品后,才能消费。消费者消费完这批产品后,生产者才能继续生产。

我们利用Condition可以协调线程之间的通知执行和阻塞等待特性,进行实现经典的【生产者-消费者】场景。代码有详细描述,

用最少代码演示,尽力让大家看一遍就能懂、能复用。


package lading.java.mutithread;
import cn.hutool.core.date.DateTime;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 模拟生产者-消费者,生产者生产了产品后消费者才能消费,消费者消费完产品后才能继续生产产品
 * 每次随机最多生产4件产品,消费者消费完才能继续生产
 */
public class Demo011Condition {
    //可消费的产品数量
    public static AtomicInteger productNum = new AtomicInteger(0);
    //可重入锁
    public static ReentrantLock lock = new ReentrantLock();
    //消费者信号
    public static Condition consumer = lock.newCondition();
    //生产者信号
    public static Condition producer = lock.newCondition();
    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                lock.lock();
                try {
                    //如果有产品可以消费
                    while (productNum.get() > 0) {
                        //生产者线程阻塞等待,并释放锁
                        producer.await();
                    }
                    //随机生产最多4件产品,耗时3s
                    Thread.sleep(3000);
                    int total = new Random().nextInt(4) + 1;
                    for (int i = 0; i < total; i++) {
                        System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") + " 【" + Thread.currentThread().getName() + "】新生产了产品:" + productNum.incrementAndGet());
                    }
                    //通知消费者来消费
                    System.out.println("生产者 --> 已生产" + total + "件产品,可以过来消费。");
                    consumer.signalAll();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }, "生产者").start();
        new Thread(() -> {
            while (true) {
                lock.lock();
                try {
                    //没有可以消费,消费者进入阻塞等待,释放锁
                    while (productNum.get() == 0) {
                        consumer.await();
                    }
                    //消费者消费产品,耗时2s
                    Thread.sleep(2000);
                    int total = productNum.get();
                    while (productNum.get() != 0) {
                        System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") + " 【" + Thread.currentThread().getName() + "】消费了产品:" + productNum.getAndDecrement());
                    }
                    //唤醒生产者继续生产
                    System.out.println("消费者 --> " + total + "件产品已清空,请继续生产。");
                    producer.signalAll();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }, "消费者").start();
    }
}

结果,消费者和生产者交叉运行。

生产好了产品,通知消费者消费;

消费者消费完产品,通知生产者继续生产产品。

image.png

1、Condition是什么?

    Condition很简单,它只是JUC包里的一个接口。定义了2个核心方法。

一个是await()方法,可以让线程进入阻塞等待。

另一个是signal()方法,可以唤醒指定线程。

image.png


1.1 具体看看Condition源码

      在AQS的内部类ConditionObject对这个接口进行了实现。我们看ConditionObject,他就是AQS里经典的FIFO条件等待队列。

image.png

ConditionObject的类结构图。

image.png

看了源码结构,和刚才总结一样,Condition有2大核心方法:

其中,await(),还有一个阻塞等待时间入参可选await(long time, TimeUnit unit):让当前线程进入阻塞,等待该条件唤醒,并设置等待超时时间;

然后signal()是唤醒等待该Condition的一个线程;而signalAll()是唤醒所有等待该Condition条件的线程。


2、Condition的条件等待唤醒和Object.wait()、notify()有什么区别

   问到正主了,其实这个问题也就是问:Condition的优点有哪些。最开始我们学JAVA编程,都是从synchronized开始,线程并发协调,常用的就是对象的wait(),nofity()方法。

   说到Condition的条件等待和唤醒,与JAVA对象的等待唤醒机制区别,就应该先说说ReentrantLock和synchronized的区别。这个区别在之前的文章《ReentrantLock核心原理剖析》有过分享。这里再补充几个。其中之一,就是ReentrantLock通过Condition条件队列实现多路通知,而synchronized只能单路通知。具体就是ReentrantLock可以通过多个不同的condition条件队列进行等待和唤醒,synchronized相当于功能单一的锁。

另外synchronized的锁是通过JVM实现,而ReentrantLock是通过接口api实现。


那Condition条件队列和对象的等待阻塞最大不同是什么?

一句话:Condition支持按顺序精准唤醒线程。而Object的做不到。

比如要求实现A和B执行完成后,C才能执行,而且C执行完成后才能执行D、F。任意指定协调条件,Condition都可以支持。

篇幅有限,这个精准唤醒多条件协调案例demo先不放上来了。


3、说说Condition条件队列核心原理

    那我们就说说await()方法是如何实现的,一句话,核心就是:调用await()方法后,当前线程进入阻塞,同时释放锁。

具体我们看看源码。


public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //将自己加入到AQS的FIFO等待队列尾部,进入等待    
            Node node = addConditionWaiter();
            //释放锁资源
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //检测本线程节点是否在AQS同步队列中,如果不在就继续检测,直到发现在AQS同步队列中
            while (!isOnSyncQueue(node)) {
                //如果在队列中,就挂起本线程
                LockSupport.park(this);
                //如果被中断,则跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //尝试获取锁-毕竟这期间有可能被唤醒了,以及是否发生过中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

具体await()方法里,就是线程进入了阻塞状态,以及自旋,不断判断自己NODE节点是在等待队列,还是在AQS队列。如果在AQS队列,就说明已经被唤醒了,不用再继续阻塞。如果还在等待队列,就继续自旋。


最后,我们再看看signal()唤醒方法。唤醒该条件队列线程。如果ReentrantLock是公平锁,就唤醒等待时间最长的头节点线程,如果是非公平锁,就唤醒整个等待队列。

唤醒操作,其实就是从等待队列中,将头节点挪到AQS节点,这样这个节点就被唤醒,不用再阻塞。如果是signalAll(),就是将全部等待队列节点,挪到AQS,唤醒他们。


public final void signal() {
            //判断当前线程是否持有锁,只有持有锁才能唤醒其他线程,不持有锁,就抛异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        //具体唤醒过程
        private void doSignal(Node first) {
            //尝试将头节点NODE唤醒,从等待队列中,挪到AQS队列,并通过CAS操作更新NODE状态
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

     今天就分享这么多,目前已分享synchronized、volatile、AQS、CAS、ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier、Condition,并发编程里的基础中的基础已经分享完了.

     下次我们分享线程池、并发容器、ThreadLocal、Future、FutureTask。

相关文章
|
2月前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
1月前
|
存储 监控 Java
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
118 12
|
2月前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
85 12
|
2月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
223 2
|
2月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
2月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
72 3
|
Java API
java多线程 -- Condition 控制线程通信
Api文档如此定义: Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。
836 0
|
2天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
35 14
|
5天前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
34 13
|
6天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。