1. Condition的应用
在上一篇文章我们用Condition实现了线程交替打印0和1功能。调用Condition的await(),能实现将当前线程释放获取到的相应的锁。并且阻塞当前线程,直到其他线程调用了同一Condition的signal(),如果有多个线程在同一个Condition上调用了await()方法,那么这些线程将会被封装成一个Node节点,加入到Condition内部维护的单链表的尾部。调用了Condition的signal()。将会把Condition内部的单链表表首的Node节点,放入到Condition所在锁的AQS中竞争锁。
接下来我将使用ReentrantLock和Condition来模拟下学生等待下课铃响出去玩的情景
package com.peter.tips.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Created by jiangbin on 2018/6/4. */ public class StudentAndBell { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Thread student = new Thread(){ @Override public void run() { super.run(); try { lock.lock(); System.out.println("我等下课铃响,出去耍~"); condition.await();//释放锁,等待下课铃声响起 System.out.println("出去耍~"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }; Thread bell = new Thread(){ @Override public void run() { super.run(); try { lock.lock(); System.out.println("下课铃响了...."); condition.signal();//通知学生下课了 } finally { lock.unlock(); } } }; student.start(); bell.start(); try { student.join(); bell.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main"); } }
输出结果如下:
t1 print
t2 print
t1 after await
main
2. Condition实现原理
首先我们来看下Condition的定义
java.util.concurrent.locks.Condition.java public interface Condition { /** * 阻塞当前线程,直到该线程通过signal重新唤醒 * {@linkplain Thread#interrupt interrupted}. * / void await() throws InterruptedException; /** * await过程可以被中断 */ void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; /** * 唤醒一个在该Condition上等待的线程. * */ void signal(); /** * 唤醒所有在该Condition上等待的线程. * */ void signalAll(); }
Condition接口的具体实现类为ConditionObject。它是AbstractQueuedSynchronizer非静态内部类。
public class ConditionObject implements Condition, java.io.Serializable { /** 指向单向链表的头部结点 */ private transient Node firstWaiter; /** 指向单向链表的尾部结点 */ private transient Node lastWaiter; ... } 我们再看下Node的定义 static final class Node{ ...省略其他定义 //prev 是在AQS双向链表中指向 前一个结点 volatile Node prev; //next 是在AQS双向链表中指向 后一个结点 volatile Node next; //阻塞的线程 volatile Thread thread; //nextWaiter 是在Condition单向列表指向后一个节点 Node nextWaiter; }
看了上面关于ConditionObject和Node的定义,我们大概可以猜到,Condition内部维护的单向链表,当有线程调用了Condition的await(),就会将该线程封装成Node结点,并放入到单向链表尾部。当有线程调用了Condition的signal(),便会将Condition中的链表头部的节点,放入到AQS的双向链表中通过自旋(即不断循环)获取锁。signalAll()会将Condition链表中所有结点都放到AQS中自旋
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter();//加入到Condition的单向链表中 int savedState = fullyRelease(node);//释放当前线程获取到的锁 int interruptMode = 0; //只要不在AQS队列中就一直阻塞,当调用了signal该node就会加入到队列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
public final void signal() { if (!isHeldExclusively())//如果没获取到锁是会报错的 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null)//拿到等待队列的第一个Node,放入AQS队列 doSignal(first);
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null);//通知结点失败,并且链表中还有数据,一直通知直到有一个成功 }
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //结点加入到AQS队列, Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first);//把所有的节点都放到AQS列表中 }