剑指JUC原理-6.wait notify(上):https://developer.aliyun.com/article/1413609
同步模式之保护性暂停
定义
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点:
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
实现
class GuardedObject { private Object response; private final Object lock = new Object(); public Object get() { synchronized (lock) { // 条件不满足则等待 while (response == null) { try { lock.wait(); }catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete(Object response) { synchronized (lock) { // 条件满足,通知等待线程 this.response = response; lock.notifyAll(); } } }
应用
一个线程等待另一个线程的执行结果
join的局限性是只能等第二个线程结束
用join那种办法,等待结果那个变量只能设计成全局的,而使用保护性暂停的话可以设计成局部的
public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { try { // 子线程执行下载 List<String> response = download();// 执行下载操作,去完成网页的信息 log.debug("download complete..."); guardedObject.complete(response); } catch (IOException e) { e.printStackTrace(); } }).start(); log.debug("waiting..."); // 主线程阻塞等待 Object response = guardedObject.get(); log.debug("get response: [{}] lines", ((List<String>) response).size()); }
执行结果:
08:42:18.568 [main] c.TestGuardedObject - waiting... 08:42:23.312 [Thread-0] c.TestGuardedObject - download complete... 08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines
可以看到,是存在等待时间的,大概5s左右
带超时版 GuardedObject
如果要控制超时时间呢
class GuardedObjectV2 { private Object response; private final Object lock = new Object(); public Object get(long millis) { synchronized (lock) { // 1) 记录最初时间 long begin = System.currentTimeMillis(); // 2) 已经经历的时间 long timePassed = 0; while (response == null) { // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等 long waitTime = millis - timePassed; log.debug("waitTime: {}", waitTime); if (waitTime <= 0) { log.debug("break..."); break; } try { lock.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } // 3) 如果提前被唤醒,这时已经经历的时间假设为 400 timePassed = System.currentTimeMillis() - begin; log.debug("timePassed: {}, object is null {}", timePassed, response == null); } return response; } } public void complete(Object response) { synchronized (lock) { // 条件满足,通知等待线程 this.response = response; log.debug("notify..."); lock.notifyAll(); } } }
测试,没有超时
public static void main(String[] args) { GuardedObjectV2 v2 = new GuardedObjectV2(); new Thread(() -> { sleep(1); v2.complete(null); sleep(1); v2.complete(Arrays.asList("a", "b", "c")); }).start(); Object response = v2.get(2500); if (response != null) { log.debug("get response: [{}] lines", ((List<String>) response).size()); } else { log.debug("can't get response"); } }
输出:
08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500 08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify... 08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true 08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497 08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify... 08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false 08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines
测试,超时
// 等待时间不足 List<String> lines = v2.get(1500);
输出:
08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 1500 08:47:55.963 [Thread-0] c.GuardedObjectV2 - notify... 08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true 08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 498 08:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true 08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 0 08:47:56.461 [main] c.GuardedObjectV2 - break... 08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response 08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...
join原理
保护性暂停,是一个线程等待另一个线程的结果
而join是一个线程等待另一个线程的结束
进入源码查看 thread.join 和 带超时版 GuardedObject 有异曲同工之妙
进入源码可以发现,join的底层也是使用的保护性暂停的手段
是调用者轮询检查线程 alive 状态
t1.join(); 等价于下面的代码 synchronized (t1) { // 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束 while (t1.isAlive()) { t1.wait(0); } }
多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
新增 id 用来标识 Guarded Object
class GuardedObject { // 标识 Guarded Object private int id; public GuardedObject(int id) { this.id = id; } public int getId() { return id; } // 结果 private Object response; // 获取结果 // timeout 表示要等待多久 2000 public Object get(long timeout) { synchronized (this) { // 开始时间 15:00:00 long begin = System.currentTimeMillis(); // 经历的时间 long passedTime = 0; while (response == null) { // 这一轮循环应该等待的时间 long waitTime = timeout - passedTime; // 经历的时间超过了最大等待时间时,退出循环 if (timeout - passedTime <= 0) { break; } try { this.wait(waitTime); // 虚假唤醒 15:00:01 } catch (InterruptedException e) { e.printStackTrace(); } // 求得经历时间 passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s } return response; } } // 产生结果 public void complete(Object response) { synchronized (this) { // 给结果成员变量赋值 this.response = response; this.notifyAll(); } } }
中间解耦类
class Mailboxes { private static Map<Integer, GuardedObject> boxes = new Hashtable<>(); private static int id = 1; // 产生唯一 id private static synchronized int generateId() { return id++; } public static GuardedObject getGuardedObject(int id) { return boxes.remove(id); } public static GuardedObject createGuardedObject() { GuardedObject go = new GuardedObject(generateId()); boxes.put(go.getId(), go); return go; } public static Set<Integer> getIds() { return boxes.keySet(); } } 其中中间解耦类中的共享资源要设置成线程安全的,如synchronized 和 Hashtable
业务相关类
class People extends Thread{ @Override public void run() { // 收信 GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.debug("开始收信 id:{}", guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail); } } class Postman extends Thread { private int id; private String mail; public Postman(int id, String mail) { this.id = id; this.mail = mail; } @Override public void run() { GuardedObject guardedObject = Mailboxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}", id, mail); guardedObject.complete(mail); } }
测试
public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } Sleeper.sleep(1); for (Integer id : Mailboxes.getIds()) { new Postman(id, "内容" + id).start(); } }
某次运行结果
10:35:05.689 c.People [Thread-1] - 开始收信 id:3 10:35:05.689 c.People [Thread-2] - 开始收信 id:1 10:35:05.689 c.People [Thread-0] - 开始收信 id:2 10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2 10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1 10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2 10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1 10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3 10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3
异步模式之生产者/消费者
之所以是异步,是因为,产生了一个消息,并不能立刻被消费。而保护性暂停是同步的
定义
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
实现
class Message { private int id; private Object message; public Message(int id, Object message) { this.id = id; this.message = message; } public int getId() { return id; } public Object getMessage() { return message; } } class MessageQueue { private LinkedList<Message> queue; private int capacity; public MessageQueue(int capacity) { this.capacity = capacity; queue = new LinkedList<>(); } public Message take() { synchronized (queue) { while (queue.isEmpty()) { log.debug("没货了, wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = queue.removeFirst(); queue.notifyAll(); return message; } } public void put(Message message) { synchronized (queue) { while (queue.size() == capacity) { log.debug("库存已达上限, wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(message); queue.notifyAll(); } } }
应用
public static void main(String[] args) { MessageQueue messageQueue = new MessageQueue(2); // 4 个生产者线程, 下载任务 for (int i = 0; i < 4; i++) { int id = i; new Thread(() -> { try { System.out.println("download..."); List<String> response = Downloader.download(); System.out.println("try put message()"+ id); messageQueue.put(new Message(id, response)); } catch (InterruptedException e) { e.printStackTrace(); } }, "生产者" + i).start(); } // 1 个消费者线程, 处理结果 new Thread(() -> { while (true) { Message message = messageQueue.take(); List<String> response = (List<String>) message.getMessage(); System.out.println("take message({}):" + message.getId()); } }, "消费者").start(); }
10:48:38.070 [生产者3] c.TestProducerConsumer - download... 10:48:38.070 [生产者0] c.TestProducerConsumer - download... 10:48:38.070 [消费者] c.MessageQueue - 没货了, wait 10:48:38.070 [生产者1] c.TestProducerConsumer - download... 10:48:38.070 [生产者2] c.TestProducerConsumer - download... 10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1) 10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2) 10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0) 10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3) 10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait 10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait 10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines 10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait 10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines 10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines 10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines 10:48:41.240 [消费者] c.MessageQueue - 没货了, wait
多加几个消费者线程本质上也是一样的。
Park & Unpark
基本使用
它们是 LockSupport 类中的方法
// 暂停当前线程 LockSupport.park(); // 恢复某个线程的运行 LockSupport.unpark(暂停线程对象)
先 park 再 unpark
Thread t1 = new Thread(() -> { log.debug("start..."); sleep(1); log.debug("park..."); LockSupport.park(); log.debug("resume..."); },"t1"); t1.start(); sleep(2); log.debug("unpark..."); LockSupport.unpark(t1);
输出
18:42:52.585 c.TestParkUnpark [t1] - start... 18:42:53.589 c.TestParkUnpark [t1] - park... 18:42:54.583 c.TestParkUnpark [main] - unpark... 18:42:54.583 c.TestParkUnpark [t1] - resume...
先 unpark 再 park
Thread t1 = new Thread(() -> { log.debug("start..."); sleep(2); log.debug("park..."); LockSupport.park(); log.debug("resume..."); }, "t1"); t1.start(); sleep(1); log.debug("unpark..."); LockSupport.unpark(t1);
输出
18:43:50.765 c.TestParkUnpark [t1] - start... 18:43:51.764 c.TestParkUnpark [main] - unpark... 18:43:52.769 c.TestParkUnpark [t1] - park... 18:43:52.769 c.TestParkUnpark [t1] - resume...
特点
与 Object 的 wait & notify 相比
- wait,notify 和 notifyAll 必须配合 Object Monitor (必须先获得monitor锁才可以)一起使用,而 park,unpark 不必
- park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
原理之 park & unpark
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex
原理分为两种情况
先park再unpark
- 当前线程调用 Unsafe.park() 方法
- 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
- 线程进入 _cond 条件变量阻塞
- 设置 _counter = 0
- 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
- 唤醒 _cond 条件变量中的 Thread_0
- Thread_0 恢复运行
- 设置 _counter 为 0
先unpark后park
- 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
- 当前线程调用 Unsafe.park() 方法
- 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
- 设置 _counter 为 0
注意:多次调用unpark也仅仅只是将 counter设置为1