共享模型之管程(1)https://developer.aliyun.com/article/1530866
6、Wait/Notify
小故事 - 为什么需要 wait
由于条件不满足,小南不能继续进行计算
但小南如果一直占用着锁,其它人就得一直阻塞,效率太低
于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,
其它人可以由老王随机安排进屋
直到小M将烟送来,大叫一声 [ 你的烟到了 ] (调用 notify 方法)
小南于是可以离开休息室,重新进入竞争锁的队列
(1)原理
- 锁对象调用wait方法(obj.wait),就会使当前线程进入WaitSet中,变为WAITING状态。
- 处于BLOCKED和WAITING状态的线程都为阻塞状态,CPU都不会分给他们时间片。但是有所区别:
- BLOCKED状态的线程是在竞争对象时,发现Monitor的Owner已经是别的线程了,此时就会进入EntryList中,并处于BLOCKED状态
- WAITING状态的线程是获得了对象的锁,但是自身因为某些原因需要进入阻塞状态时,锁对象调用了wait方法而进入了WaitSet中,处于WAITING状态
- BLOCKED状态的线程会在锁被释放的时候被唤醒,但是处于WAITING状态的线程只有被锁对象调用了notify方法(obj.notify/obj.notifyAll),才会被唤醒。
注:只有当对象被锁以后,才能调用wait和notify方法
public class Test1 { final static Object LOCK = new Object(); public static void main(String[] args) throws InterruptedException { //只有在对象被锁住后才能调用wait方法 synchronized (LOCK) { LOCK.wait(); } } }Copy
API 介绍
obj.wait() 让进入 object 监视器的线程到 waitSet 等待
obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法
final static Object obj = new Object(); public static void main(String[] args) { new Thread(() - > { synchronized(obj) { log.debug("执行...."); try { obj.wait(); // 让线程在obj上一直等待下去 } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } }).start(); new Thread(() - > { synchronized(obj) { log.debug("执行...."); try { obj.wait(); // 让线程在obj上一直等待下去 } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } }).start(); // 主线程两秒后执行 sleep(2); log.debug("唤醒 obj 上其它线程"); synchronized(obj) { obj.notify(); // 唤醒obj上一个线程 // obj.notifyAll(); // 唤醒obj上所有等待线程 } }
notify 的一种结果
20:00:53.096 [Thread-0] c.TestWaitNotify - 执行.... 20:00:53.099 [Thread-1] c.TestWaitNotify - 执行.... 20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....
notifyAll 的结果
19:58:15.457 [Thread-0] c.TestWaitNotify - 执行.... 19:58:15.460 [Thread-1] c.TestWaitNotify - 执行.... 19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码.... 19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....
wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到
notify 为止
wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify
(2)Wait与Sleep的区别
不同点
- Sleep是Thread类的静态方法,Wait是Object的方法,Object又是所有类的父类,所以所有类都有Wait方法。
- Sleep在阻塞的时候不会释放锁,而Wait在阻塞的时候会释放锁
- Sleep不需要与synchronized一起使用,而Wait需要与synchronized一起使用(对象被锁以后才能使用)
相同点
- 阻塞状态都为TIMED_WAITING
(3)优雅地使用wait/notify
什么时候适合使用wait
- 当线程不满足某些条件,需要暂停运行时,可以使用wait。这样会将对象的锁释放,让其他线程能够继续运行。如果此时使用sleep,会导致所有线程都进入阻塞,导致所有线程都没法运行,直到当前线程sleep结束后,运行完毕,才能得到执行。
使用wait/notify需要注意什么
- 当有多个线程在运行时,对象调用了wait方法,此时这些线程都会进入WaitSet中等待。如果这时使用了notify方法,可能会造成虚假唤醒(唤醒的不是满足条件的等待线程),这时就需要使用notifyAll方法
synchronized (LOCK) { while(//不满足条件,一直等待,避免虚假唤醒) { LOCK.wait(); } //满足条件后再运行 } synchronized (LOCK) { //唤醒所有等待线程 LOCK.notifyAll(); }
然后其他线程用while
7、模式之保护性暂停
(1)定义
(2)举例
public class Test2 { public static void main(String[] args) { String hello = "hello thread!"; Guarded guarded = new Guarded(); new Thread(()->{ System.out.println("想要得到结果"); synchronized (guarded) { System.out.println("结果是:"+guarded.getResponse()); } System.out.println("得到结果"); }).start(); new Thread(()->{ System.out.println("设置结果"); synchronized (guarded) { guarded.setResponse(hello); } }).start(); } } class Guarded { /** * 要返回的结果 */ private Object response; //优雅地使用wait/notify public Object getResponse() { //如果返回结果为空就一直等待,避免虚假唤醒 while(response == null) { synchronized (this) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return response; } public void setResponse(Object response) { this.response = response; synchronized (this) { //唤醒休眠的线程 this.notifyAll(); } } @Override public String toString() { return "Guarded{" + "response=" + response + '}'; } }Copy
带超时判断的暂停
public Object getResponse(long time) { synchronized (this) { //获取开始时间 long currentTime = System.currentTimeMillis(); //用于保存已经等待了的时间 long passedTime = 0; while(response == null) { //看经过的时间-开始时间是否超过了指定时间 long waitTime = time -passedTime; if(waitTime <= 0) { break; } try { //等待剩余时间 this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } //获取当前时间 passedTime = System.currentTimeMillis()-currentTime } } return response; }Copy
(3)join源码——使用保护性暂停模式
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); //一直等待 } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
wait():注意,前述提及wait操作对象一定是持有锁的对象,而join方法在方法头中含有Syschronized关键字
拓展
图中Futures就好比居民楼一层的信箱(每个信箱有房间编号),左侧的t0,t2,t4就好比等待邮件的居民,右侧的t1,t3,t5就好比邮递员
如果需要在多个类之间使用GuardedObject对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
邮递员和收信者就不需要互相传递GuardedObject对象
package cn.itcast.test; import cn.itcast.n2.util.Sleeper; import lombok.extern.slf4j.Slf4j; import java.util.Hashtable; import java.util.Map; import java.util.Set; @Slf4j(topic = "c.Test20") public class Test20 { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } Sleeper.sleep(1); for (Integer id : Mailboxes.getIds()) { new Postman(id, "内容" + id).start(); } } } @Slf4j(topic = "c.People") class People extends Thread{ @Override public void run() { // 收信 GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.debug("开始收信 id:{}", guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail); } } @Slf4j(topic = "c.Postman") class Postman extends Thread { private int id; private String mail; public Postman(int id, String mail) { this.id = id; this.mail = mail; } @Override public void run() { GuardedObject guardedObject = Mailboxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}", id, mail); guardedObject.complete(mail); } } class Mailboxes { private static Map<Integer, GuardedObject> boxes = new Hashtable<>(); private static int id = 1; // 产生唯一 id private static synchronized int generateId() { return id++; } public static GuardedObject getGuardedObject(int id) { return boxes.remove(id); } public static GuardedObject createGuardedObject() { GuardedObject go = new GuardedObject(generateId()); boxes.put(go.getId(), go); return go; } public static Set<Integer> getIds() { return boxes.keySet(); } } // 增加超时效果 class GuardedObject { // 标识 Guarded Object private int id; public GuardedObject(int id) { this.id = id; } public int getId() { return id; } // 结果 private Object response; // 获取结果 // timeout 表示要等待多久 2000 public Object get(long timeout) { synchronized (this) { // 开始时间 15:00:00 long begin = System.currentTimeMillis(); // 经历的时间 long passedTime = 0; while (response == null) { // 这一轮循环应该等待的时间 long waitTime = timeout - passedTime; // 经历的时间超过了最大等待时间时,退出循环 if (timeout - passedTime <= 0) { break; } try { this.wait(waitTime); // 虚假唤醒 15:00:01 } catch (InterruptedException e) { e.printStackTrace(); } // 求得经历时间 passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s } return response; } } // 产生结果 public void complete(Object response) { synchronized (this) { // 给结果成员变量赋值 this.response = response; this.notifyAll(); } } }
异步模式之生产者消费者
- 与前面的保护性暂停中的GuardObjecl不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果
- 数据消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK中各种阻塞队列,采用的就是这种模式
java之间线程进行通信,rabbitmq是进程之间进行通信
package cn.itcast.test; import lombok.extern.slf4j.Slf4j; import java.util.LinkedList; import static cn.itcast.n2.util.Sleeper.sleep; @Slf4j(topic = "c.Test21") public class Test21 { public static void main(String[] args) { MessageQueue queue = new MessageQueue(2); for (int i = 0; i < 3; i++) { int id = i; new Thread(() -> { queue.put(new Message(id , "值"+id)); }, "生产者" + i).start(); } new Thread(() -> { while(true) { sleep(1); Message message = queue.take(); } }, "消费者").start(); } } // 消息队列类 , java 线程之间通信 @Slf4j(topic = "c.MessageQueue") class MessageQueue { // 消息的队列集合 private LinkedList<Message> list = new LinkedList<>(); // 队列容量 private int capcity; public MessageQueue(int capcity) { this.capcity = capcity; } // 获取消息 public Message take() { // 检查队列是否为空 synchronized (list) { while(list.isEmpty()) { try { log.debug("队列为空, 消费者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 从队列头部获取消息并返回 Message message = list.removeFirst(); log.debug("已消费消息 {}", message); list.notifyAll(); return message; } } // 存入消息 public void put(Message message) { synchronized (list) { // 检查对象是否已满 while(list.size() == capcity) { try { log.debug("队列已满, 生产者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 将消息加入队列尾部 list.addLast(message); log.debug("已生产消息 {}", message); list.notifyAll(); } } } final class Message { private int id; private Object value; public Message(int id, Object value) { this.id = id; this.value = value; } public int getId() { return id; } public Object getValue() { return value; } @Override public String toString() { return "Message{" + "id=" + id + ", value=" + value + '}'; } }