17.AQS中的Condition是什么

简介: 大家好,我是王有志。今天和大家聊聊Condition,它为Lock接口提供了等待与唤醒功能,使Lock接口具备了与synchronized相同的能力。

欢迎关注:王有志
期待你加入Java人的提桶跑路群:共同富裕的Java人

今天来和大家聊聊ConditionCondition为AQS“家族”提供了等待与唤醒的能力,使AQS"家族"具备了像synchronized一样暂停与唤醒线程的能力。我们先来看两道关于Condition的面试题目:

  • ConditionObject的等待与唤醒有什么区别?

  • 什么是Condition队列?

接下来,我们就按照“是什么”,“怎么用”和“如何实现”的顺序来揭开Condition的面纱吧。

Condition是什么?

Condition是Java中的接口,提供了与Object#waitObject#notify相同的功能。Doug Lea在Condition接口的描述中提到了这点:

Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true.

来看Condition接口中提供了哪些方法:

public interface Condition {
  void await() throws InterruptedException;

  void awaitUninterruptibly();

  long awaitNanos(long nanosTimeout) throws InterruptedException;

  boolean await(long time, TimeUnit unit) throws InterruptedException;

  boolean awaitUntil(Date deadline) throws InterruptedException;

  void signal();

  void signalAll();
}

Condition只提供了两个功能:等待(await)和唤醒(signal),与Object提供的等待与唤醒时相似的:

public final void wait() throws InterruptedException;

public final void wait(long timeoutMillis, int nanos) throws InterruptedException;

public final native void wait(long timeoutMillis) throws InterruptedException;

@HotSpotIntrinsicCandidate
public final native void notify();

@HotSpotIntrinsicCandidate
public final native void notifyAll();

唤醒功能上,ConditionObject的差异并不大:

  • Condition#signal$\approx$Object#notify

  • Condition#signalAll$=$Object#notifyAll

多个线程处于等待状态时,Object#notify()是“随机”唤醒线程,而Condition#signal则由具体实现决定如何唤醒线程,如:ConditionObject唤醒的是最早进入等待的线程但两个方法均只唤醒一个线程。

等待功能上,ConditionObject的共同点是:都会释放持有的资源Condition释放锁Object释放Monitor,即进入等待状态后允许其他线程获取锁/监视器。主要的差异体现在Condition支持了更加丰富的场景,通过一张表格来对比下:

Condition方法 Object方法 解释
Condition#await() Object#wait() 暂停线程,抛出线程中断异常
Condition#awaitUninterruptibly() / 暂停线程,不抛出线程中断异常
Condition#await(time, unit) Object#wait(timeoutMillis, nanos) 暂停线程,直到被唤醒或等待指定时间后,超时后自动唤醒返回false,否则返回true
Condition#awaitUntil(deadline) / 暂停线程,直到被唤醒或到达指定时间点,超时后自动唤醒返回false,否则返回true
Condition#awaitNanos(nanosTimeout) / 暂停线程,直到被唤醒或等待指定时间后,返回值表示被唤醒时的剩余时间(nanosTimeout-耗时),结果为负数表示超时

除了以上差异外,Condition还支持创建多个等待队列,即同一把锁拥有多个等待队列,线程在不同队列中等待,而Object只有一个等待队列。《Java并发编程的艺术》中也有一张类似的表格,放在这里供大家参考:

图1:Condition与Object的对比.png

Tips

  • 实际上signal翻译为唤醒并不恰当~~

  • 涉及到Condition的实现部分,下文通过AQS中的ConditionObject详细解释。

Condition怎么用?

既然ConditionObject提供的等待与唤醒功能相同,那么它们的用法是不是也很相似呢?

与调用Object#waitObject#notifyAll必须处于synchronized修饰的代码中一样(获取Monitor),调用Condition#awaitCondition#signalAll的前提是要先获取锁。但不同的是,使用Condition前,需要先通过锁去创建Condition

ReentrantLock中提供的Condition为例,首先是创建Condition对象:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

然后是获取锁并调用await方法:

new Thread(() -> {
  lock.lock();
  try {
    condition.await();
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
  lock.unlock();
}

最后,通过调用singalAll唤醒全部阻塞中的线程:

new Thread(() -> {
  lock.lock();
  condition.signalAll();
  lock.unlock();
}

ConditionObject的源码分析

作为接口Condition非常惨,因为在Java中只有AQS中的内部类ConditionObject实现了Condition接口:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

  public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter;

    private transient Node lastWaiter;
  }

  static final class Node {
    // 省略
  }
}

ConditionObject只有两个Node类型的字段,分别是链式结构中的头尾节点,ConditionObject就是通过它们实现的等待队列。那么ConditionObject的等待队列起到了怎样的作用呢?是类似于AQS中的排队机制吗?带着这两个问题,我们正是开始源码的分析。

await方法的实现

Condition接口中定义了4个线程等待的方法:

  • void await() throws InterruptedException

  • void awaitUninterruptibly();

  • long awaitNanos(long nanosTimeout) throws InterruptedException;

  • boolean await(long time, TimeUnit unit) throws InterruptedException;

  • boolean awaitUntil(Date deadline) throws InterruptedException;

方法虽然很多,但它们之间的差异较小,只体现在时间的处理上,我们看其中最常用的方法:

public final void await() throws InterruptedException {
  // 线程中断,抛出异常
  if (Thread.interrupted()) {
    throw new InterruptedException();
  }
  // 注释1:加入到Condition的等待队列中
  Node node = addConditionWaiter();
  // 注释2:释放持有锁(调用AQS的release)
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  // 注释3:判断是否在AQS的等待队列中
  while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    // 中断时退出方法
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
      break;
    }
  }

  // 加入到AQS的等待队列中,调用AQS的acquireQueued方法
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
    interruptMode = REINTERRUPT;
  }

  // 断开与Condition队列的联系
  if (node.nextWaiter != null) {
    unlinkCancelledWaiters();
  }

  if (interruptMode != 0) {
   reportInterruptAfterWait(interruptMode);
  }
}

注释1的部分,调用addConditionWaiter方法添加到Condition队列中:

private Node addConditionWaiter() {
  // 判断当前线程是否为持有锁的线程
  if (!isHeldExclusively()) {
    throw new IllegalMonitorStateException();
  }

  // 获取Condition队列的尾节点
  Node t = lastWaiter;
  // 断开不再位于Condition队列的节点
  if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
  }

  // 创建Node.CONDITION模式的Node节点
  Node node = new Node(Node.CONDITION);
  if (t == null) {
    // 队列为空的场景,将node设置为头节点
    firstWaiter = node;
  } else {
    // 队列不为空的场景,将node添加到尾节点的后继节点上
    t.nextWaiter = node;
  }
  // 更新尾节点
  lastWaiter = node;
  return node;
}

可以看到,Condition的队列是一个朴实无华的双向链表,每次调用addConditionWaiter方法,都会加入到Condition队列的尾部。

注释2的部分,释放线程持有的锁,同时移出AQS的队列,内部调用了AQS的release方法:

=final int fullyRelease(Node node) {
  try {
    int savedState = getState();
    if (release(savedState)) {
      return savedState;
    }
    throw new IllegalMonitorStateException();
  } catch (Throwable t) {
    node.waitStatus = Node.CANCELLED;
    throw t;
  }
}

因为已经分析过AQS的release方法和ReentrantLock实现的tryRelease方法,这里我们就不过多赘述了。

注释3的部分,isOnSyncQueue判断当前线程是否在AQS的等待队列中,我们来看此时存在的情况:

  • 如果isOnSyncQueue返回false,即线程不在AQS的队列中,进入自旋,调用LockSupport#park暂停线程;

  • 如果isOnSyncQueue返回true,即线程在AQS的队列中,不进入自旋,执行后续逻辑。

结合注释1和注释2的部分,Condition#await的实现原理了就很清晰了:

  • Condition与AQS分别维护了一个等待队列,而且是互斥的,即同一个节点只会出现在一个队列中

  • 当调用Condition#await时,将线程添加到Condition的队列中(注释1),同时从AQS队列中移出(注释2);

  • 接着判断线程位于的队列:

    • 位于Condition队列中,该线程需要被暂停,调用LockSupport#park

    • 位于AQS队列中,该线程正在等待获取锁。

基于以上的结论,我们已经能够猜到唤醒方法Condition#signalAll的原理了:

  • 将线程从Condition队列中移出,并添加到AQS的队列中;

  • 调用LockSupport.unpark唤醒线程。

至于这个猜想是否正确,我们接着来看唤醒方法的实现。

Tips:如果忘记了AQS中相关方法是如何实现的,可以回顾下《AQS的今生,构建出JUC的基础》。

signal和signalAll方法的实现

来看signalsignalAll的源码:

// 唤醒一个处于等待中的线程
public final void signal() {
  if (!isHeldExclusively()) {
    throw new IllegalMonitorStateException();
  }
  // 获取Condition队列中的第一个节点
  Node first = firstWaiter;
  if (first != null) {
    // 唤醒第一个节点
    doSignal(first);
  }
}

// 唤醒全部处于等待中的线程
public final void signalAll() {
    if (!isHeldExclusively()){
      throw new IllegalMonitorStateException();
    }

    Node first = firstWaiter;
    if (first != null) {
      // 唤醒所有节点
      doSignalAll(first);
    }  
}

两个方法唯一的差别在于头节点不为空的场景下,是调用doSignal唤醒一个线程还是调用doSignalAll唤醒所有线程:

private void doSignal(Node first) {
  do {
    // 更新头节点
    if ( (firstWaiter = first.nextWaiter) == null) {
      // 无后继节点的场景
      lastWaiter = null;
    }
    // 断开节点的连接
    first.nextWaiter = null;
    // 唤醒头节点
  } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

private void doSignalAll(Node first) {
  // 将Condition的队列置为空
  lastWaiter = firstWaiter = null;
  do {
    // 断开链接
    Node next = first.nextWaiter;
    first.nextWaiter = null;
    // 唤醒当前头节点
    transferForSignal(first);
    // 更新头节点
    first = next;
  } while (first != null);
}

可以看到,无论是doSignal还是doSignalAll都只是将节点移出Condition队列,而真正起到唤醒作用的是transferForSignal方法,从方法名可以看到该方法是通过“转移”进行唤醒的,我们来看源码:

final boolean transferForSignal(Node node) {
  // 通过CAS替换node的状态
  // 如果替换失败,说明node不处于Node.CONDITION状态,不需要唤醒
  if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
    return false;
  }
  // 将节点添加到AQS的队列的队尾
  // 并返回老队尾节点,即node的前驱节点
  Node p = enq(node);
  int ws = p.waitStatus;
  // 对前驱节点状态的判断
  if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {
    LockSupport.unpark(node.thread);
  }
  return true;
}

transferForSignal方法中,调用enq方法将node重新添加到AQS的队列中,并返回node的前驱节点,随后对前驱节点的状态进行判断:

  • 当$ws > 0$时,前驱节点处于Node.CANCELLED状态,前驱节点退出锁的争抢,node可以直接被唤醒;

  • 当$ws \leq 0$时,通过CAS修改前驱节点的状态为Node.SIGNAL,设置失败时,直接唤醒node

AQS的今生,构建出JUC的基础》中介绍了waitStatus的5种状态,其中Node.SIGNAL状态表示需要唤醒后继节点。另外,在分析shouldParkAfterFailedAcquire方法的源码时,我们知道在进入AQS的等待队列时,需要将前驱节点的状态更新为Node.SIGNAL

最后来看enq的实现:

private Node enq(Node node) {
  for (;;) {
    // 获取尾节点
    Node oldTail = tail;
    if (oldTail != null) {
      // 更新当前节点的前驱节点
      node.setPrevRelaxed(oldTail);
      // 更新尾节点
      if (compareAndSetTail(oldTail, node)) {
        oldTail.next = node;
        // 返回当前节点的前驱节点(即老尾节点)
        return oldTail;
      }
    } else {
      initializeSyncQueue();
    }
  }
}

enq的实现就非常简单了,通过CAS更新AQS的队列尾节点,相当于添加到AQS的队列中,并返回尾节点的前驱节点。好了,唤醒方法的源码到这里就结束了,是不是和我们当初的猜想一模一样呢?

图解ConditionObject原理

功能上,Condition实现了AQS版Object#waitObject#notify,用法上也与之相似,需要先获取锁,即需要在lockunlock之间调用。原理上,简单来说就是线程在AQS的队列和Condition的队列之间的转移

线程t持有锁

假设有线程t已经获取了ReentrantLock,线程t1,t2和t3正在AQS的队列中等待,我们可以得到这样的结构:

图2:ReentrantLock的等待队列.png

线程t执行Condition#await

如果线程t中调用了Condition#await方法,线程t进入Condition的等待队列中,线程t1获取ReentrantLock,并从AQS的队列中移出,结构如下:

图3:ReentrantLock中两个队列.png

线程t1执行Condition#await

如果线程t1中也执行了Condition#await方法,同样线程t1进入Condition队列中,线程t2获取到ReentrantLock,结构如下:

图4:ReentrantLock中两个队列2.png

线程t2执行Condition#signal

如果线程t2执行了Condition#signal,唤醒Condition队列中的第一个线程,此时结构如下:

图5:ReentrantLock中两个队列3.png

通过上面的流程,我们就可以得到线程是如何在Condition队列与AQS队列中转移的:

图6:队列间的转换.png

结语

关于Condition的内容到这里就结束了,无论是理解,使用还是剖析原理,Condition的难度并不高,只不过大家可能平时用得比较少,因此多少有些陌生。

最后,截止到文章发布,我应该是把开头两道题目的题解写完了吧~~


好了,今天就到这里了,Bye~~

目录
相关文章
|
7月前
|
SQL IDE Java
hibernate5 Cannot create TypedQuery for query with more than one return using requested result type
hibernate5 Cannot create TypedQuery for query with more than one return using requested result type
80 0
|
7月前
|
SQL Java 数据库连接
【问题解决】nested exception is org.apache.ibatis.exceptions.TooManyResultException:Expected one result
【问题解决】nested exception is org.apache.ibatis.exceptions.TooManyResultException:Expected one result
|
7月前
Caused by: org.apache.ibatis.type.TypeException: Could not resolve type alias ‘order‘. Caus
Caused by: org.apache.ibatis.type.TypeException: Could not resolve type alias ‘order‘. Caus
124 0
|
7月前
|
数据库连接 测试技术 数据库
什么是`with...as`语句?
【4月更文挑战第5天】`with...as`语句是Python中的上下文管理器,用于自动处理资源的分配和释放,常见于文件操作。基本语法是`with expression as variable:`,在代码块内使用`variable`操作资源,离开时资源自动关闭。示例中展示了文件操作,自定义上下文管理器(如数据库连接),以及`contextlib`模块的使用,简化资源管理
120 1
|
7月前
|
SQL 关系型数据库 MySQL
mysql查询语句的访问方法const、ref、ref_or_null、range、index、all
mysql查询语句的访问方法const、ref、ref_or_null、range、index、all
|
数据库
解决numpy.core._exceptions.UFuncTypeError: ufunc ‘add‘ did not contain a loop with signature matching
解决numpy.core._exceptions.UFuncTypeError: ufunc ‘add‘ did not contain a loop with signature matching
1224 0
解决numpy.core._exceptions.UFuncTypeError: ufunc ‘add‘ did not contain a loop with signature matching
|
Java 数据库连接 mybatis
@TableId(type = IdType.ASSIGN_ID)
@TableId(type = IdType.ASSIGN_ID)
186 1
|
JavaScript API
Array.apply(null,{length: 99}) 逻辑解析
Array.apply(null,{length: 99}) 逻辑解析
91 0
|
XML Java 数据库连接
nested exception is org.apache.ibatis.executor.ExecutorException: A query was run and no Result Maps
nested exception is org.apache.ibatis.executor.ExecutorException: A query was run and no Result Maps
274 0
Could not resolve element type of Iterable type xxxxx.RequestParam java.util.List<?>. Not declared?
Could not resolve element type of Iterable type xxxxx.RequestParam java.util.List<?>. Not declared?
Could not resolve element type of Iterable type xxxxx.RequestParam java.util.List<?>. Not declared?