Java中的线程协作之Condition

简介: Java中的线程协作之Condition一、Condition接口1、Condition接口的常用方法介绍复制代码 1 /** 2 * 已经获取到锁的线程调用该方法会进入等待状态,知道其他持有锁的线程通知(signal)等待队列中的线程或者被中断退出等待队列; 3 * 如果该线程已经从该方法中...

Java中的线程协作之Condition
一、Condition接口
1、Condition接口的常用方法介绍
复制代码
1 /**
2 * 已经获取到锁的线程调用该方法会进入等待状态,知道其他持有锁的线程通知(signal)等待队列中的线程或者被中断退出等待队列;
3 * 如果该线程已经从该方法中返回,表名线程已经获取到了Condition对象对应的锁
4 */
5 public final void await() throws InterruptedException {...}
6 /**
7 * 还是进入等待状态的方法,只是该方法对中断不敏感:当前调用该方法的线程只有被通知(signal)才能从等待队列中退出
8 */
9 public final void awaitUninterruptibly() {...}
10 /**
11 * 当前线程进入等待状态,被通知、中断或者超时之后被唤醒。返回值就是表示剩余的时间,即
12 * 如果在nanosTimeout纳秒之前被唤醒,返回值就是实际耗时;如果返回值是0或者负数,就认为是超时了
13 */
14 public final long awaitNanos(long nanosTimeout) {...}
15 /**
16 * 调用该方法的线程会进入等待状态直到被通知、中断或者到达某个超时时间。
17 * 意味着没有到达指定的某个时间被通知,就会返回true;如果到达指定时间,返回false
18 */
19 public final boolean awaitUntil(Date deadline){}
20 /**
21 * 当前持有Condition对象对应锁的线程,调用该方法之后会唤醒一个等待在Condition上的线程
22 */
23 public final void signal() {}
24 /**
25 * 当前持有Condition对象对应锁的线程,调用该方法之后会唤醒等待在Condition上的所有线程
26 */
27 public final void signalAll() {}
复制代码
  Condition的使用模板:Condition的获取必须通过Lock的newCondition方法,表示Condition对象与该锁关联,一般讲Condition对象作为成员变量,调用上面的await方法之后当前线程才会释放锁并在等待队列中进行等待;当其他的线程(在没有中断的情况下)调用该condition对象的signal方法的时候就会通知等待队列中的等待线程从await方法返回(返回之前已经获取锁)。

复制代码
1 Lock lock = new ReentrantLock();
2 Condition con = lock.newCondition();
3 public void conWait() {
4 lock.lock();
5 try {
6 con.await();
7 } catch(InterruptedException e) {
8 ...
9 }finally {
10 lock.unlock();
11 }
12 }
13
14 public void conSignal() {
15 lock.lock();
16 try {
17 con.signal();
18 } catch(InterruptedException e) {
19 ...
20 }finally {
21 lock.unlock();
22 }
23 }
复制代码
2、Condition的实现分析
a)源码流程分析
  我们通过跟踪源码可以看出来,首先创建锁对象(new ReentrantLock()),然后根据锁对象关联响应的Condition对象,然后通过Condition对象中维护的等待队列实现等待(await)通知(signal)机制。

复制代码
1 public Condition newCondition() { //ReentrantLock类中的方法
2 return sync.newCondition();
3 }
4 //ConditionObject类实现Condition接口,除此室外ConditionObject也是AQS的一个内部类,Condition的操作需要与锁关联起来
5 final ConditionObject newCondition() {
6 return new ConditionObject();
7 }
8 //AQS的内部类ConditionObject,其中维护了一个等待队列,通过该队列实现等待通知机制
9 public class ConditionObject{
10 /**
11 * 返回等待队列中的线程集合
12 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
13 * returns {@code false}
14 */
15 protected final Collection getWaitingThreads() {
16 if (!isHeldExclusively())
17 throw new IllegalMonitorStateException();
18 ArrayList list = new ArrayList();
19 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
20 if (w.waitStatus == Node.CONDITION) {
21 Thread t = w.thread;
22 if (t != null)
23 list.add(t);
24 }
25 }
26 return list;
27 }
28 }
复制代码
b)具体实现
  上面说到了Condition是通过等待队列来实现等待通知功能的,那么就分析等待队列和等待通知机制的实现

①等待队列实现
  等待队列是一个FIFO的队列,其中每个结点都包含一个处于Condition对象上等待的线程引用(当一个获取到锁的线程调用await方法,就会释放锁资源,被包装成一个Node然后添加到等待队列中进入等待状态;这里面的Node结点还是和AQS中的实现机理一样,Node是AQS中的静态内部类)。

  ConditionObject类中有下面两个属性,分别代表一个Condition对应的等待队列的首节点和尾结点。当前线程调用await方法之后就会被构造成一个Node结点然后加入到等待队列的尾部。

1 /* Condition等待队列头结点 /
2 private transient Node firstWaiter;
3 /* Condition等待队列尾结点 /
4 private transient Node lastWaiter;
  下面是等待队列的基本结构,Condition对象中有首尾结点的引用。新增加的结点需要将原有的尾结点的下一节点指向它,然后更新lastWaiter即可。

  上面的情况是一个Condition对象对应一个等待队列和一个同步队列(上面新添加的Node3就是从同步队列中移除然后添加过来的),在同步器组件实现中,会拥有一个同步队列和多个等待队列。

②等待操作的实现
  持有锁的线程调Condition的await方法之后会释放锁,然后进入等待状态。既然是持有锁的线程,那么该线程应该位于同步队列的首节点位置,其调用await方法之后就会从同步队列首节点移到等待队列的尾结点等待。具体将其移到等待队列是addConditionWaiter方法实现。下面是await方法和addConditionWaiter方法的实现分析。

复制代码
public final void await() throws InterruptedException {

if (Thread.interrupted())
    throw new InterruptedException();
Node node = addConditionWaiter(); //将当前线程加入等待队列
int savedState = fullyRelease(node); //释放当前线程持有的锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

}
private Node addConditionWaiter() {

Node t = lastWaiter;
/**
 * waitStatus值表示线程正在等待条件(原本结点在等待队列中,结点线程等待在Condition上,当其他线程对
 * Condition调用了signal()方法之后)该结点会从等待队列中转移到同步队列中,进行同步状态的获取 
 * static final int CONDITION = -2;
*/
if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); //构造成Condition的等待队列中的对应的结点
//增加的结点需要将原有的尾结点的下一节点指向它,然后更新lastWaiter
if (t == null)
    firstWaiter = node;
else
    t.nextWaiter = node;
lastWaiter = node;
return node;

}
复制代码

③通知操作的实现
  通知操作的实现机制就是将当前等待队列中的首节点中的线程唤醒,将其加入同步队列中。

复制代码
1 public final void signal() {
2 if (!isHeldExclusively()) //检查当前线程是否获取锁
3 throw new IllegalMonitorStateException();
4 Node first = firstWaiter;
5 if (first != null)
6 doSignal(first);
7 }
复制代码
  唤醒线程使其进入同步队列之后,我们再来看await方法中那些没有执行的代码。

复制代码
1 public final void await() throws InterruptedException {
2 if (Thread.interrupted())
3 throw new InterruptedException();
4 Node node = addConditionWaiter(); //将当前线程加入等待队列
5 int savedState = fullyRelease(node); //释放当前线程持有的锁
6 int interruptMode = 0;
7 //根据下面的源码可以看出,当前线程如果掉用await方法之后会进入等待队列,那么在退出等待队列之前会一直执行这个循环
8 while (!isOnSyncQueue(node)) {
9 LockSupport.park(this); //唤醒节点中的线程
10 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
11 break;
12 }
13 //通过acquireQueued源码可以发现,获取锁的流程和ReentrantLock这种独占式获取同步状态的流程基本一致
14 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
15 interruptMode = REINTERRUPT;
16 if (node.nextWaiter != null) // clean up if cancelled
17 unlinkCancelledWaiters();
18 if (interruptMode != 0)
19 reportInterruptAfterWait(interruptMode);
20 }
21 final boolean isOnSyncQueue(Node node) {
22 if (node.waitStatus == Node.CONDITION || node.prev == null) //判断当前队列是否在等待队列中
23 return false;
24 if (node.next != null) // If has successor, it must be on queue
25 return true;
26 return findNodeFromTail(node);
27 }
28 //竞争锁资源的同步队列
29 final boolean acquireQueued(final Node node, int arg) {
30 boolean failed = true;
31 try {
32 boolean interrupted = false;
33 for (;;) {
34 final Node p = node.predecessor(); //得到当前结点的前驱结点
35 if (p == head && tryAcquire(arg)) { //当前结点的前驱结点为头结点,并且尝试获取锁成功
36 setHead(node); //将当前获取到锁的结点设置为头结点
37 p.next = null; // help GC
38 failed = false;
39 return interrupted;
40 }
41 //如果获取同步状态失败,应该自旋等待继续获取并且校验自己的中断标志位信息
42 if (shouldParkAfterFailedAcquire(p, node) &&
43 parkAndCheckInterrupt())
44 interrupted = true;
45 }
46 } finally {
47 if (failed)
48 cancelAcquire(node);
49 }
50 }
复制代码
  从上面的代码中我们可以看出,当调用await方法的线程在没有回到同步队列之前,都会一直在while (!isOnSyncQueue(node)){...}循环中,只有被唤醒退出等待队列进入同步队列才会从循环中退出;之后调用acquireQueued()开始自旋等待锁的获取,这个自旋的过程和前面介绍的AQS中独占式锁的获取流程一样;最后,如果线程从这个自旋的过程退出了,就代表当前线程再次获取了锁资源,最后也从await方法返回。所以,一个线程调用await方法之后,只有最终获取到锁才会从该方法返回。而对于signalAll而言就是对等待队列中的每个线程通知(signal)一次,这样就可以将等待队列中的所有线程移到同步队列中进行锁资源的获取。

回到顶部
二、Condition接口使用
1、Condition接口搭配ReentrantLock实现生产者消费者模式
复制代码
1 package cn.source.condition;
2
3 import java.util.LinkedList;
4 import java.util.concurrent.TimeUnit;
5 import java.util.concurrent.locks.Condition;
6 import java.util.concurrent.locks.Lock;
7 import java.util.concurrent.locks.ReentrantLock;
8
9 public class ConditionProducerAndConsumer {
10
11 private LinkedList list = new LinkedList();
12 private static final int MAX_NUM = 10; //容器的最大数量
13 private int count = 0; //容器中实际数量
14
15 private Lock lock = new ReentrantLock();
16 private Condition producer = lock.newCondition();
17 private Condition consumer = lock.newCondition();
18
19 private int getCount() {
20 return count;
21 }
22
23 private void put(E e) {
24 lock.lock(); //首先需要获取锁
25 try {
26 //这里是判断容器是否已满,注意需要使用while:如果使用if的话可能导致所有的消费线程都处于等待状态
27 while(list.size() == MAX_NUM) {
28 System.out.println(Thread.currentThread().getName() + "正在等待中");
29 producer.await(); //生产者线程进入等待状态
30 }
31 //添加元素
32 list.add(e);
33 count ++;
34 consumer.signalAll();//将消费者线程唤醒
35 } catch (InterruptedException e1) {
36 e1.printStackTrace();
37 } finally {
38 lock.unlock();
39 }
40 }
41
42 private E get() {
43 E e = null;
44 lock.lock();
45 try {
46 while(list.size() == 0) {
47 System.out.println(Thread.currentThread().getName() + "正在等待");
48 consumer.await(); //消费者线程进入等待状态
49 }
50 e = list.removeFirst();
51 count --;
52 producer.signalAll(); //消费元素之后,将生产者线程唤醒
53 } catch (InterruptedException e1) {
54 e1.printStackTrace();
55 } finally {
56 lock.unlock();
57 }
58 return e;
59 }
60
61 public static void main(String[] args) {
62 SyncProducerAndConsumer syncProducerAndConsumer = new SyncProducerAndConsumer<>();
63 for (int i = 0; i < 10; i++) { //开启10个线程
64 new Thread(new Runnable() {
65 @Override
66 public void run() {
67 for (int j = 0; j < 5; j++) { //每个线程从容器中获取5次数据
68 System.out.println(syncProducerAndConsumer.get());
69 }
70 }
71
72 }, "消费者线程" + i).start();;
73 }
74 //休眠2秒,所有的消费者线程都已经启动并且处于等待状态
75 try {
76 TimeUnit.SECONDS.sleep(2);
77 } catch (InterruptedException e) {
78 e.printStackTrace();
79 }
80
81 for (int i = 0; i < 2; i++) { //开启两个生产者线程
82 new Thread(new Runnable() {
83 @Override
84 public void run() {
85 for (int j = 0; j < 25; j++) { //每个生产者线程想容器中添加25个数据,当容器中数据到达10个的时候生产者线程会阻塞
86 syncProducerAndConsumer.put("add value " + j);
87 }
88 }
89 }, "生产者线程"+i).start();
90 }
91 }
92
93 }
复制代码
2、synchronized组合wait/notify实现生产者消费者模式
复制代码
1 package cn.source.condition;
2
3 import java.util.LinkedList;
4 import java.util.concurrent.TimeUnit;
5
6 public class SyncProducerAndConsumer {
7
8 private LinkedList list = new LinkedList();
9 private static final int MAX_NUM = 10; //容器的最大数量
10 private int count = 0; //容器中实际数量
11
12 public synchronized int getCount() {
13 return count;
14 }
15
16 public synchronized void put(E e) {
17 while(list.size() == MAX_NUM) { //这里是判断容器是否已满,注意需要使用while:如果使用if的话可能导致所有的消费线程都处于等待状态
18 try {
19 this.wait(); //容器满了之后,生产者线程进入等待状态
20 } catch (InterruptedException e1) {
21 e1.printStackTrace();
22 }
23 }
24 //容器未满,生产者线程就想容器中添加数据
25 list.add(e);
26 count ++;
27 this.notifyAll(); //此时容器中已经存在数据,唤醒等待的消费者线程
28 }
29
30 public synchronized E get() {
31 E e = null;
32 while(list.size() == 0) { //判断容器是否为空,如果为空就进入等待状态,这里也使用while
33 try {
34 this.wait();
35 } catch (InterruptedException e1) {
36 e1.printStackTrace();
37 }
38 }
39 e = list.removeFirst();
40 count --;
41 this.notifyAll();
42 return e;
43 }
44
45 public static void main(String[] args) {
46 SyncProducerAndConsumer syncProducerAndConsumer = new SyncProducerAndConsumer<>();
47 for (int i = 0; i < 10; i++) { //开启10个线程
48 new Thread(new Runnable() {
49 @Override
50 public void run() {
51 for (int j = 0; j < 5; j++) { //每个线程从容器中获取5次数据
52 System.out.println(syncProducerAndConsumer.get());
53 }
54 }
55
56 }, "消费者线程" + i).start();;
57 }
58 //休眠2秒,所有的消费者线程都已经启动并且处于等待状态
59 try {
60 TimeUnit.SECONDS.sleep(2);
61 } catch (InterruptedException e) {
62 e.printStackTrace();
63 }
64
65 for (int i = 0; i < 2; i++) { //开启两个生产者线程
66 new Thread(new Runnable() {
67 @Override
68 public void run() {
69 for (int j = 0; j < 25; j++) { //每个生产者线程想容器中添加25个数据,当容器中数据到达10个的时候生产者线程会阻塞
70 syncProducerAndConsumer.put("add value " + j);
71 }
72 }
73 }, "生产者线程"+i).start();
74 }
75 }
76
77
78 }
复制代码
3、Object中的等待唤醒机制和Condition的等待通知机制对比
原文地址https://www.cnblogs.com/fsmly/p/10721459.html

相关文章
|
5天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
1天前
|
Java
深入理解Java中的多线程编程
本文将探讨Java多线程编程的核心概念和技术,包括线程的创建与管理、同步机制以及并发工具类的应用。我们将通过实例分析,帮助读者更好地理解和应用Java多线程编程,提高程序的性能和响应能力。
13 4
|
9天前
|
Java 调度 开发者
Java并发编程:深入理解线程池
在Java的世界中,线程池是提升应用性能、实现高效并发处理的关键工具。本文将深入浅出地介绍线程池的核心概念、工作原理以及如何在实际应用中有效利用线程池来优化资源管理和任务调度。通过本文的学习,读者能够掌握线程池的基本使用技巧,并理解其背后的设计哲学。
|
9天前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
5天前
|
Java 调度 开发者
Java中的多线程基础及其应用
【9月更文挑战第13天】本文将深入探讨Java中的多线程概念,从基本理论到实际应用,带你一步步了解如何有效使用多线程来提升程序的性能。我们将通过实际代码示例,展示如何在Java中创建和管理线程,以及如何利用线程池优化资源管理。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和技巧,帮助你更好地理解和应用多线程编程。
|
10天前
|
缓存 监控 Java
java中线程池的使用
java中线程池的使用
|
10天前
|
算法 Java 数据处理
Java并发编程:解锁多线程的力量
在Java的世界里,掌握并发编程是提升应用性能和响应能力的关键。本文将深入浅出地探讨如何利用Java的多线程特性来优化程序执行效率,从基础的线程创建到高级的并发工具类使用,带领读者一步步解锁Java并发编程的奥秘。你将学习到如何避免常见的并发陷阱,并实际应用这些知识来解决现实世界的问题。让我们一起开启高效编码的旅程吧!
|
15天前
|
存储 Java 程序员
优化Java多线程应用:是创建Thread对象直接调用start()方法?还是用个变量调用?
这篇文章探讨了Java中两种创建和启动线程的方法,并分析了它们的区别。作者建议直接调用 `Thread` 对象的 `start()` 方法,而非保持强引用,以避免内存泄漏、简化线程生命周期管理,并减少不必要的线程控制。文章详细解释了这种方法在使用 `ThreadLocal` 时的优势,并提供了代码示例。作者洛小豆,文章来源于稀土掘金。
|
12天前
|
Java 开发者
Java中的多线程编程基础与实战
【9月更文挑战第6天】本文将通过深入浅出的方式,带领读者了解并掌握Java中的多线程编程。我们将从基础概念出发,逐步深入到代码实践,最后探讨多线程在实际应用中的优势和注意事项。无论你是初学者还是有一定经验的开发者,这篇文章都能让你对Java多线程有更全面的认识。
16 1
|
9天前
|
安全 Java UED
Java并发编程:解锁多线程的潜力
在Java的世界里,并发编程如同一场精心编排的交响乐,每个线程扮演着不同的乐手,共同奏响性能与效率的和声。本文将引导你走进Java并发编程的大门,探索如何在多核处理器上优雅地舞动多线程,从而提升应用的性能和响应性。我们将从基础概念出发,逐步深入到高级技巧,让你的代码在并行处理的海洋中乘风破浪。