Java中的线程协作之Condition

简介: Java中的线程协作之Condition一、Condition接口1、Condition接口的常用方法介绍复制代码 1 /** 2 * 已经获取到锁的线程调用该方法会进入等待状态,知道其他持有锁的线程通知(signal)等待队列中的线程或者被中断退出等待队列; 3 * 如果该线程已经从该方法中...

Java中的线程协作之Condition
一、Condition接口
1、Condition接口的常用方法介绍
复制代码
1 /**
2 * 已经获取到锁的线程调用该方法会进入等待状态,知道其他持有锁的线程通知(signal)等待队列中的线程或者被中断退出等待队列;
3 * 如果该线程已经从该方法中返回,表名线程已经获取到了Condition对象对应的锁
4 */
5 public final void await() throws InterruptedException {...}
6 /**
7 * 还是进入等待状态的方法,只是该方法对中断不敏感:当前调用该方法的线程只有被通知(signal)才能从等待队列中退出
8 */
9 public final void awaitUninterruptibly() {...}
10 /**
11 * 当前线程进入等待状态,被通知、中断或者超时之后被唤醒。返回值就是表示剩余的时间,即
12 * 如果在nanosTimeout纳秒之前被唤醒,返回值就是实际耗时;如果返回值是0或者负数,就认为是超时了
13 */
14 public final long awaitNanos(long nanosTimeout) {...}
15 /**
16 * 调用该方法的线程会进入等待状态直到被通知、中断或者到达某个超时时间。
17 * 意味着没有到达指定的某个时间被通知,就会返回true;如果到达指定时间,返回false
18 */
19 public final boolean awaitUntil(Date deadline){}
20 /**
21 * 当前持有Condition对象对应锁的线程,调用该方法之后会唤醒一个等待在Condition上的线程
22 */
23 public final void signal() {}
24 /**
25 * 当前持有Condition对象对应锁的线程,调用该方法之后会唤醒等待在Condition上的所有线程
26 */
27 public final void signalAll() {}
复制代码
  Condition的使用模板:Condition的获取必须通过Lock的newCondition方法,表示Condition对象与该锁关联,一般讲Condition对象作为成员变量,调用上面的await方法之后当前线程才会释放锁并在等待队列中进行等待;当其他的线程(在没有中断的情况下)调用该condition对象的signal方法的时候就会通知等待队列中的等待线程从await方法返回(返回之前已经获取锁)。

复制代码
1 Lock lock = new ReentrantLock();
2 Condition con = lock.newCondition();
3 public void conWait() {
4 lock.lock();
5 try {
6 con.await();
7 } catch(InterruptedException e) {
8 ...
9 }finally {
10 lock.unlock();
11 }
12 }
13
14 public void conSignal() {
15 lock.lock();
16 try {
17 con.signal();
18 } catch(InterruptedException e) {
19 ...
20 }finally {
21 lock.unlock();
22 }
23 }
复制代码
2、Condition的实现分析
a)源码流程分析
  我们通过跟踪源码可以看出来,首先创建锁对象(new ReentrantLock()),然后根据锁对象关联响应的Condition对象,然后通过Condition对象中维护的等待队列实现等待(await)通知(signal)机制。

复制代码
1 public Condition newCondition() { //ReentrantLock类中的方法
2 return sync.newCondition();
3 }
4 //ConditionObject类实现Condition接口,除此室外ConditionObject也是AQS的一个内部类,Condition的操作需要与锁关联起来
5 final ConditionObject newCondition() {
6 return new ConditionObject();
7 }
8 //AQS的内部类ConditionObject,其中维护了一个等待队列,通过该队列实现等待通知机制
9 public class ConditionObject{
10 /**
11 * 返回等待队列中的线程集合
12 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
13 * returns {@code false}
14 */
15 protected final Collection getWaitingThreads() {
16 if (!isHeldExclusively())
17 throw new IllegalMonitorStateException();
18 ArrayList list = new ArrayList();
19 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
20 if (w.waitStatus == Node.CONDITION) {
21 Thread t = w.thread;
22 if (t != null)
23 list.add(t);
24 }
25 }
26 return list;
27 }
28 }
复制代码
b)具体实现
  上面说到了Condition是通过等待队列来实现等待通知功能的,那么就分析等待队列和等待通知机制的实现

①等待队列实现
  等待队列是一个FIFO的队列,其中每个结点都包含一个处于Condition对象上等待的线程引用(当一个获取到锁的线程调用await方法,就会释放锁资源,被包装成一个Node然后添加到等待队列中进入等待状态;这里面的Node结点还是和AQS中的实现机理一样,Node是AQS中的静态内部类)。

  ConditionObject类中有下面两个属性,分别代表一个Condition对应的等待队列的首节点和尾结点。当前线程调用await方法之后就会被构造成一个Node结点然后加入到等待队列的尾部。

1 /* Condition等待队列头结点 /
2 private transient Node firstWaiter;
3 /* Condition等待队列尾结点 /
4 private transient Node lastWaiter;
  下面是等待队列的基本结构,Condition对象中有首尾结点的引用。新增加的结点需要将原有的尾结点的下一节点指向它,然后更新lastWaiter即可。

  上面的情况是一个Condition对象对应一个等待队列和一个同步队列(上面新添加的Node3就是从同步队列中移除然后添加过来的),在同步器组件实现中,会拥有一个同步队列和多个等待队列。

②等待操作的实现
  持有锁的线程调Condition的await方法之后会释放锁,然后进入等待状态。既然是持有锁的线程,那么该线程应该位于同步队列的首节点位置,其调用await方法之后就会从同步队列首节点移到等待队列的尾结点等待。具体将其移到等待队列是addConditionWaiter方法实现。下面是await方法和addConditionWaiter方法的实现分析。

复制代码
public final void await() throws InterruptedException {

if (Thread.interrupted())
    throw new InterruptedException();
Node node = addConditionWaiter(); //将当前线程加入等待队列
int savedState = fullyRelease(node); //释放当前线程持有的锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

}
private Node addConditionWaiter() {

Node t = lastWaiter;
/**
 * waitStatus值表示线程正在等待条件(原本结点在等待队列中,结点线程等待在Condition上,当其他线程对
 * Condition调用了signal()方法之后)该结点会从等待队列中转移到同步队列中,进行同步状态的获取 
 * static final int CONDITION = -2;
*/
if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); //构造成Condition的等待队列中的对应的结点
//增加的结点需要将原有的尾结点的下一节点指向它,然后更新lastWaiter
if (t == null)
    firstWaiter = node;
else
    t.nextWaiter = node;
lastWaiter = node;
return node;

}
复制代码

③通知操作的实现
  通知操作的实现机制就是将当前等待队列中的首节点中的线程唤醒,将其加入同步队列中。

复制代码
1 public final void signal() {
2 if (!isHeldExclusively()) //检查当前线程是否获取锁
3 throw new IllegalMonitorStateException();
4 Node first = firstWaiter;
5 if (first != null)
6 doSignal(first);
7 }
复制代码
  唤醒线程使其进入同步队列之后,我们再来看await方法中那些没有执行的代码。

复制代码
1 public final void await() throws InterruptedException {
2 if (Thread.interrupted())
3 throw new InterruptedException();
4 Node node = addConditionWaiter(); //将当前线程加入等待队列
5 int savedState = fullyRelease(node); //释放当前线程持有的锁
6 int interruptMode = 0;
7 //根据下面的源码可以看出,当前线程如果掉用await方法之后会进入等待队列,那么在退出等待队列之前会一直执行这个循环
8 while (!isOnSyncQueue(node)) {
9 LockSupport.park(this); //唤醒节点中的线程
10 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
11 break;
12 }
13 //通过acquireQueued源码可以发现,获取锁的流程和ReentrantLock这种独占式获取同步状态的流程基本一致
14 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
15 interruptMode = REINTERRUPT;
16 if (node.nextWaiter != null) // clean up if cancelled
17 unlinkCancelledWaiters();
18 if (interruptMode != 0)
19 reportInterruptAfterWait(interruptMode);
20 }
21 final boolean isOnSyncQueue(Node node) {
22 if (node.waitStatus == Node.CONDITION || node.prev == null) //判断当前队列是否在等待队列中
23 return false;
24 if (node.next != null) // If has successor, it must be on queue
25 return true;
26 return findNodeFromTail(node);
27 }
28 //竞争锁资源的同步队列
29 final boolean acquireQueued(final Node node, int arg) {
30 boolean failed = true;
31 try {
32 boolean interrupted = false;
33 for (;;) {
34 final Node p = node.predecessor(); //得到当前结点的前驱结点
35 if (p == head && tryAcquire(arg)) { //当前结点的前驱结点为头结点,并且尝试获取锁成功
36 setHead(node); //将当前获取到锁的结点设置为头结点
37 p.next = null; // help GC
38 failed = false;
39 return interrupted;
40 }
41 //如果获取同步状态失败,应该自旋等待继续获取并且校验自己的中断标志位信息
42 if (shouldParkAfterFailedAcquire(p, node) &&
43 parkAndCheckInterrupt())
44 interrupted = true;
45 }
46 } finally {
47 if (failed)
48 cancelAcquire(node);
49 }
50 }
复制代码
  从上面的代码中我们可以看出,当调用await方法的线程在没有回到同步队列之前,都会一直在while (!isOnSyncQueue(node)){...}循环中,只有被唤醒退出等待队列进入同步队列才会从循环中退出;之后调用acquireQueued()开始自旋等待锁的获取,这个自旋的过程和前面介绍的AQS中独占式锁的获取流程一样;最后,如果线程从这个自旋的过程退出了,就代表当前线程再次获取了锁资源,最后也从await方法返回。所以,一个线程调用await方法之后,只有最终获取到锁才会从该方法返回。而对于signalAll而言就是对等待队列中的每个线程通知(signal)一次,这样就可以将等待队列中的所有线程移到同步队列中进行锁资源的获取。

回到顶部
二、Condition接口使用
1、Condition接口搭配ReentrantLock实现生产者消费者模式
复制代码
1 package cn.source.condition;
2
3 import java.util.LinkedList;
4 import java.util.concurrent.TimeUnit;
5 import java.util.concurrent.locks.Condition;
6 import java.util.concurrent.locks.Lock;
7 import java.util.concurrent.locks.ReentrantLock;
8
9 public class ConditionProducerAndConsumer {
10
11 private LinkedList list = new LinkedList();
12 private static final int MAX_NUM = 10; //容器的最大数量
13 private int count = 0; //容器中实际数量
14
15 private Lock lock = new ReentrantLock();
16 private Condition producer = lock.newCondition();
17 private Condition consumer = lock.newCondition();
18
19 private int getCount() {
20 return count;
21 }
22
23 private void put(E e) {
24 lock.lock(); //首先需要获取锁
25 try {
26 //这里是判断容器是否已满,注意需要使用while:如果使用if的话可能导致所有的消费线程都处于等待状态
27 while(list.size() == MAX_NUM) {
28 System.out.println(Thread.currentThread().getName() + "正在等待中");
29 producer.await(); //生产者线程进入等待状态
30 }
31 //添加元素
32 list.add(e);
33 count ++;
34 consumer.signalAll();//将消费者线程唤醒
35 } catch (InterruptedException e1) {
36 e1.printStackTrace();
37 } finally {
38 lock.unlock();
39 }
40 }
41
42 private E get() {
43 E e = null;
44 lock.lock();
45 try {
46 while(list.size() == 0) {
47 System.out.println(Thread.currentThread().getName() + "正在等待");
48 consumer.await(); //消费者线程进入等待状态
49 }
50 e = list.removeFirst();
51 count --;
52 producer.signalAll(); //消费元素之后,将生产者线程唤醒
53 } catch (InterruptedException e1) {
54 e1.printStackTrace();
55 } finally {
56 lock.unlock();
57 }
58 return e;
59 }
60
61 public static void main(String[] args) {
62 SyncProducerAndConsumer syncProducerAndConsumer = new SyncProducerAndConsumer<>();
63 for (int i = 0; i < 10; i++) { //开启10个线程
64 new Thread(new Runnable() {
65 @Override
66 public void run() {
67 for (int j = 0; j < 5; j++) { //每个线程从容器中获取5次数据
68 System.out.println(syncProducerAndConsumer.get());
69 }
70 }
71
72 }, "消费者线程" + i).start();;
73 }
74 //休眠2秒,所有的消费者线程都已经启动并且处于等待状态
75 try {
76 TimeUnit.SECONDS.sleep(2);
77 } catch (InterruptedException e) {
78 e.printStackTrace();
79 }
80
81 for (int i = 0; i < 2; i++) { //开启两个生产者线程
82 new Thread(new Runnable() {
83 @Override
84 public void run() {
85 for (int j = 0; j < 25; j++) { //每个生产者线程想容器中添加25个数据,当容器中数据到达10个的时候生产者线程会阻塞
86 syncProducerAndConsumer.put("add value " + j);
87 }
88 }
89 }, "生产者线程"+i).start();
90 }
91 }
92
93 }
复制代码
2、synchronized组合wait/notify实现生产者消费者模式
复制代码
1 package cn.source.condition;
2
3 import java.util.LinkedList;
4 import java.util.concurrent.TimeUnit;
5
6 public class SyncProducerAndConsumer {
7
8 private LinkedList list = new LinkedList();
9 private static final int MAX_NUM = 10; //容器的最大数量
10 private int count = 0; //容器中实际数量
11
12 public synchronized int getCount() {
13 return count;
14 }
15
16 public synchronized void put(E e) {
17 while(list.size() == MAX_NUM) { //这里是判断容器是否已满,注意需要使用while:如果使用if的话可能导致所有的消费线程都处于等待状态
18 try {
19 this.wait(); //容器满了之后,生产者线程进入等待状态
20 } catch (InterruptedException e1) {
21 e1.printStackTrace();
22 }
23 }
24 //容器未满,生产者线程就想容器中添加数据
25 list.add(e);
26 count ++;
27 this.notifyAll(); //此时容器中已经存在数据,唤醒等待的消费者线程
28 }
29
30 public synchronized E get() {
31 E e = null;
32 while(list.size() == 0) { //判断容器是否为空,如果为空就进入等待状态,这里也使用while
33 try {
34 this.wait();
35 } catch (InterruptedException e1) {
36 e1.printStackTrace();
37 }
38 }
39 e = list.removeFirst();
40 count --;
41 this.notifyAll();
42 return e;
43 }
44
45 public static void main(String[] args) {
46 SyncProducerAndConsumer syncProducerAndConsumer = new SyncProducerAndConsumer<>();
47 for (int i = 0; i < 10; i++) { //开启10个线程
48 new Thread(new Runnable() {
49 @Override
50 public void run() {
51 for (int j = 0; j < 5; j++) { //每个线程从容器中获取5次数据
52 System.out.println(syncProducerAndConsumer.get());
53 }
54 }
55
56 }, "消费者线程" + i).start();;
57 }
58 //休眠2秒,所有的消费者线程都已经启动并且处于等待状态
59 try {
60 TimeUnit.SECONDS.sleep(2);
61 } catch (InterruptedException e) {
62 e.printStackTrace();
63 }
64
65 for (int i = 0; i < 2; i++) { //开启两个生产者线程
66 new Thread(new Runnable() {
67 @Override
68 public void run() {
69 for (int j = 0; j < 25; j++) { //每个生产者线程想容器中添加25个数据,当容器中数据到达10个的时候生产者线程会阻塞
70 syncProducerAndConsumer.put("add value " + j);
71 }
72 }
73 }, "生产者线程"+i).start();
74 }
75 }
76
77
78 }
复制代码
3、Object中的等待唤醒机制和Condition的等待通知机制对比
原文地址https://www.cnblogs.com/fsmly/p/10721459.html

相关文章
|
8天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
7天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
7天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
6天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
12天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
37 9
|
9天前
|
安全 Java 开发者
Java多线程编程中的常见问题与解决方案
本文深入探讨了Java多线程编程中常见的问题,包括线程安全问题、死锁、竞态条件等,并提供了相应的解决策略。文章首先介绍了多线程的基础知识,随后详细分析了每个问题的产生原因和典型场景,最后提出了实用的解决方案,旨在帮助开发者提高多线程程序的稳定性和性能。
|
15天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
12天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
14天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
15天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
24 1
下一篇
无影云桌面