Condition 实现原理

简介: Condition 实现原理

Condition 实现原理



说 Condition 前,需要说下 ConditioObject。ConditionObject 是同步器 AbstractQueuedSynchronzied 的内部类,因为 Condition 的操作需要关联的锁。ArrayBlockingQueue 就是 Condition 的具体应用。Object 中其实 也有 wait ,notify ,notifyAll 等操作, Condition 相当于将   wait ,notify ,notifyAll 转换成想要的对象,将比较难懂的同步操作变成直观可控的对象行为。


应用场景 ArrayBlockingQueue


ArrayBlockingQueue 的构造函数。

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

通过构造函数,可以看到 Condition 的创建时需要关联锁的。

从队列中去取出(take)数据 。


public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

往队列中加入数据 enqueue


private void enqueue(E x) {
      // assert lock.getHoldCount() == 1;
      // assert items[putIndex] == null;
      final Object[] items = this.items;
      items[putIndex] = x;
      if (++putIndex == items.length)
          putIndex = 0;
      count++;
      notEmpty.signal();
  }

可以看主要用了 await signal 等方法。具体代表什么含义?

Condition 实现主要包含三个部分:等待队列、等待、通知

如果了解 AQS 原理可以知道, AQS 中有个同步队列的概念。


等待队列


等待队列和同步队列类似,都是一个 FIFO 队列。队列上每个节点包含一个线程引用,该线程就是 Condition 对象上的等待线程。等待队列结构如下:


640.png


Condition 等待队列,也是包含首节点(firstWaiter),和尾节点(tailWaiter),如果一个线程调用了 Condition.await() 方法。那么该线程将会释放锁,并以当前线程构造节点加入等待队列并进入等待状态。


Object 监视器模型


Object 监视器模型 包含了一个同步多路和多个等待队列,结构如下所示:


640.png


等待


当调用 Condition  的 await() 方法(或者以 await开头的方法),会使得当前线程进入等待队列,并且释放锁,同时线程的状态变为等待状态。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 当前线程加入等待队列
            Node node = addConditionWaiter();
            // 释放同步状态,也就是释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // node 不在节点中会一直 park 阻塞下去。达到等待的效果。
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }


调用该方法的线程成功获得了锁的线程,也就是同步队列的首节点,该方法将会将该线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后续节点,然后当前节点会进入等待状态。要注意的是,如果等地队列中的节点被唤醒,唤醒节点的线程开始尝试获取同步状态。但是如果不是通过 Condition.signal 进行唤醒的,而是对等待线程进行中断,那么会抛出 InterruptedException。


调用 Condition  signal 方法后,当前线程会加入到等待队列,如下图所示:


640.png



通知


调用 Condition.signal() 方法,将会唤醒等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。


public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }


需要注意的是,调用该方法的前置条件是当前线程必须获得了锁,可以看到 Signal() 方法进行了 isHeldExclusively 检查,判断是否获得了锁,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。

节点从等待队列,移动到同步队列的操作过程如下:

640.png


通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列中,当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。

被唤醒的线程,将从 await() 方法中的 while 循环中退出。从 await 方法看


//  当前节点已经在同步队列了,不会在循环下去了
      while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

随后调用同步器的 acquireQueued() 方法加入到同步队列的竞争中。


final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

成功获取同步状态(获得锁)之后,被唤醒的线程,景从先前调用的  await 方法返回。此时线程已经成功获得了锁。


总结


本文剖析了一下 Condition 的实现原理,等待队列,等待,通知的实现原理。


相关文章
|
3月前
Condition的awaitNanos&signalAll方法分析
Condition的awaitNanos&signalAll方法分析
30 6
|
3月前
|
算法 C++
【C++入门到精通】condition_variable(条件变量)C++11 [ C++入门 ]
【C++入门到精通】condition_variable(条件变量)C++11 [ C++入门 ]
33 0
|
10月前
|
安全 Java
JUC第八讲:Condition源码分析
JUC第八讲:Condition源码分析
|
3月前
|
存储 安全 C++
《C++ Concurrencyin Action》第6章--基于锁的并发数据结构设计
《C++ Concurrencyin Action》第6章--基于锁的并发数据结构设计
《C++ Concurrencyin Action》第6章--基于锁的并发数据结构设计
|
3月前
|
Java
多线程并发之显示锁Lock与其通信方式Condition源码解读
多线程并发之显示锁Lock与其通信方式Condition源码解读
34 0
|
安全 Java
一天一个 JUC 工具类 Lock 和 Condition
当谈到Java多线程编程时,我们不可避免地需要处理并发问题。为此Java提供了一个强大的工具包——java.util.concurrent(JUC)
|
API
图解ReentrantLock的条件变量Condition机制
图解ReentrantLock的条件变量Condition机制
108 0
图解ReentrantLock的条件变量Condition机制
Juc并发编程08——Condition实现源码分析
看看ReentrantLock中的newCondition方法
Juc并发编程08——Condition实现源码分析
并发编程之没有条件创造条件Condition
多线程编程必会内容, 锁条件Lock.Condition
100 0