一、基本概念
保护性暂停(Guarded Suspension):用在一个线程等待另一个线程的执行结果时使用。
保护性暂停的暂停就是当条件不满足的时候就去进行wait
等待。
要点
- 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个
GuardedObject
。 - 如果有结果不断从一个线程到另一个线程那么此时就不能使用这个保护性暂停模式了,可以使用消息队列(见生产者/消费者)。
- JDK中,join的实现、Future的实现,采用的就是此模式。(用join一个线程等待另一个线程结束就可以拿到结果了,其实这也是保护性线程的一个应用)
- 因为要等待另一方的结果,因此归类到同步模式。(关于同步模式的顺序控制实现见文章:同步模式之顺序控制
线程2产生这个结果,然后线程1想要得到这个结果,那就可以让GuardedObject
充当一个桥梁,让线程1、2都关联到这个对象上。
二、单任务版GuardedObject
示例代码
t1 等待 t2线程的下载结果
import site.weiyikai.concurrent.utils.Downloader; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.List; /** * Created by lilinchao * Date 2022/10/20 * Description 单任务版GuardedObject (t1 等待 t2线程的下载结果) */ @Slf4j(topic = "c.GuardedTest01") public class GuardedTest01 { public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { // 等待结果 log.debug("等待结果"); List<String> list = (List<String>) guardedObject.get(); log.debug("结果大小:{}", list.size()); }, "t1").start(); new Thread(() -> { log.debug("执行下载"); try { //读取下载来的数据到List集合当中 List<String> list = Downloader.download(); //产生结果 guardedObject.complete(list); } catch (IOException e) { e.printStackTrace(); } }, "t2").start(); } } /** * wait/notifyAll使用 */ class GuardedObject { // 结果 private Object response; private final Object lock = new Object(); // 获取结果(没有结果将处于一直等待状态,等待结果的产生) public Object get() { synchronized (lock) { // 没有结果则一直等待 // 防止虚假唤醒 while (response == null) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } // 产生结果 public void complete(Object response) { synchronized (lock) { // 给成员变量赋值 this.response = response; // 产生结果,通知等待线程 lock.notifyAll(); } } }
- Downloader代码
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class Downloader { public static List<String> download() throws IOException { HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection(); List<String> lines = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { lines.add(line); } } return lines; } }
运行结果
23:45:31.032 c.GuardedTest01 [t2] - 执行下载 23:45:31.032 c.GuardedTest01 [t1] - 等待结果 23:45:32.409 c.GuardedTest01 [t1] - 结果大小:3
升级代码,设置超时时间
如果超过时间还没返回结果,此时就不等了,退出while循环
import site.weiyikai.concurrent.utils.Sleeper; import lombok.extern.slf4j.Slf4j; /** * Created by lilinchao * Date 2022/10/20 * Description 带超时时间的阻塞获取结果 */ @Slf4j(topic = "c.GuardedTest02") public class GuardedTest02 { public static void main(String[] args) { // 线程1等待线程2的下载结果 GuardedObjectTime guardeObject = new GuardedObjectTime(); new Thread(() -> { log.debug("begin"); Object obj = guardeObject.get(2000); log.debug("结果是:{}", obj); }, "t1").start(); new Thread(() -> { log.debug("begin"); Sleeper.sleep(1); // 在等待时间内 // Sleeper.sleep(3);// 超时的情况 guardeObject.complete(new Object()); }, "t2").start(); } } class GuardedObjectTime { // 结果 private Object response; // 获取结果 // timeout表示等待多久 public Object get(long timeout) { synchronized (this) { // 假如开始时间为 15:00:00 long begin = System.currentTimeMillis(); // 经历的时间 long passedTime = 0; while (response == null) { // 这一轮循环应该等待的时间(假设 timeout 是 1000,结果在 400 时被唤醒了,那么还有 600 要等) long waitTime = timeout - passedTime; // 经历的时间超过了最大等待时间, 退出循环 if (waitTime <= 0) { break; } try { // 等待的时间应该 超时时间(timeout) - 经历的时间(passedTime) this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } // 经历时间 passedTime = System.currentTimeMillis() - begin; // 15:00:02 } return response; } } // 产生结果 public void complete(Object response) { synchronized (this) { // 给结果变量赋值 this.response = response; this.notifyAll(); } } }
- 在等待时间内的运行情况
23:50:19.739 c.GuardedTest02 [t1] - begin 23:50:19.739 c.GuardedTest02 [t2] - begin 23:50:20.745 c.GuardedTest02 [t1] - 结果是:java.lang.Object@2e10de1
- 超时的情况
23:52:16.857 c.GuardedTest02 [t1] - begin 23:52:16.857 c.GuardedTest02 [t2] - begin 23:52:18.861 c.GuardedTest02 [t1] - 结果是:null
三、原理之join
t.join( )方法阻塞调用此方法的线程(calling thread)进入 TIMED_WAITING 状态,直到线程 t 执行完成,此线程再继续;
通常用于在main( )主线程内,等待其它线程完成再结束main( )主线程
- 调用者轮询检查线程 alive 状态
t1.join();
- 等价于下面的代码
synchronized (t1) { // 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束 // 此处t1线程对象作为了锁 while (t1.isAlive()) { // 调用线程进了锁t1的waitSet // 注意,调用线程不是t1,t1此处是作为锁而不是作为线程 // 调用线程是其他线程,一般是主线程 t1.wait(0); } }
join源码
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { // 和上面的超时增强原理一样 while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
- 当millis==0时,会进入while( isAlive( ) )循环;即只要子线程是活的,主线程就不停的等待。
- wait( )的作用是让“当前线程”等待,而这里的“当前线程”是指当前运行的线程。虽然是调用子线程的wait( )方法,但是它是通过“主线程”去调用的;所以,休眠的是主线程,而不是“子线程”!
这样理解: 例子中的Thread t只是一个对象 , isAlive( )判断当前对象(例子中的t对象)是否存活, wait()阻塞的是当前执行的线程(一般是main方法)
可以看出,Join方法实现是通过wait( )。 当main线程调用t.join时候,main线程会获得线程对象t的锁(wait 意味着拿到该对象的锁),调用该对象的wait( ),直到该对象唤醒main线程 ,比如退出后。这就意味着main 线程调用t.join时,必须能够拿到线程t对象的锁。
四、多任务版 GuardedObject
- 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类。
- 不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个产生结果的线程和使用结果的线程是一一对应的关系,但是生产者消费者模式并不是。
- rpc框架的调用中就使用到了这种模式。
示例代码
送信收信案例
import site.weiyikai.concurrent.utils.Sleeper; import lombok.extern.slf4j.Slf4j; import java.util.Hashtable; import java.util.Map; import java.util.Set; /** * 多任务版 GuardedObject */ @Slf4j(topic = "c.GuardedTest03") public class GuardedTest03 { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } Sleeper.sleep(1); for (Integer id : Mailboxes.getIds()) { new Postman(id, "内容" + id).start(); } } } @Slf4j(topic = "c.People") class People extends Thread{ @Override public void run() { // 收信 GuardedObject03 guardedObject = Mailboxes.createGuardedObject(); log.debug("开始收信 id:{}", guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail); } } @Slf4j(topic = "c.Postman") class Postman extends Thread { private int id; private String mail; public Postman(int id, String mail) { this.id = id; this.mail = mail; } @Override public void run() { GuardedObject03 guardedObject = Mailboxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}", id, mail); guardedObject.complete(mail); } } class Mailboxes { private static Map<Integer, GuardedObject03> boxes = new Hashtable<>(); private static int id = 1; // 产生唯一 id private static synchronized int generateId() { return id++; } // 根据id得到唯一的GuardedObject,用完给到收信人得移除GuardedObject public static GuardedObject03 getGuardedObject(int id) { return boxes.remove(id); } // 产生GuardedObject public static GuardedObject03 createGuardedObject() { GuardedObject03 go = new GuardedObject03(generateId()); boxes.put(go.getId(), go); return go; } // 得到map中的所有键,也就是id public static Set<Integer> getIds() { return boxes.keySet(); } } // 增加超时效果 class GuardedObject03 { // 标识 Guarded Object private int id; public GuardedObject03(int id) { this.id = id; } public int getId() { return id; } // 结果 private Object response; // 获取结果 // timeout 表示要等待多久 2000 public Object get(long timeout) { synchronized (this) { // 开始时间 15:00:00 long begin = System.currentTimeMillis(); // 经历的时间 long passedTime = 0; while (response == null) { // 这一轮循环应该等待的时间 long waitTime = timeout - passedTime; // 经历的时间超过了最大等待时间时,退出循环 if (timeout - passedTime <= 0) { break; } try { this.wait(waitTime); // 虚假唤醒 15:00:01 } catch (InterruptedException e) { e.printStackTrace(); } // 求得经历时间 passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s } return response; } } // 产生结果 public void complete(Object response) { synchronized (this) { // 给结果成员变量赋值 this.response = response; this.notifyAll(); } } }
运行结果
00:13:15.571 c.People [Thread-1] - 开始收信 id:1 00:13:15.571 c.People [Thread-2] - 开始收信 id:2 00:13:15.571 c.People [Thread-0] - 开始收信 id:3 00:13:16.570 c.Postman [Thread-3] - 送信 id:3, 内容:内容3 00:13:16.570 c.People [Thread-0] - 收到信 id:3, 内容:内容3 00:13:16.570 c.Postman [Thread-4] - 送信 id:2, 内容:内容2 00:13:16.571 c.People [Thread-2] - 收到信 id:2, 内容:内容2 00:13:16.571 c.Postman [Thread-5] - 送信 id:1, 内容:内容1 00:13:16.571 c.People [Thread-1] - 收到信 id:1, 内容:内容1
五、异步模式之生产者/消费者
5.1 定义
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应(一个生产一个消费)
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
与保护性暂停的共同点:都是在多个线程之间进行数据传输;
异步模式中, 生产者产生消息之后消息没有被立刻消费
同步模式中, 消息在产生之后被立刻消费了。
示例
线程间通信的消息队列
注:只是简单实现,功能很基础,了解核心原理即刻
import lombok.extern.slf4j.Slf4j; import java.util.LinkedList; import static site.weiyikai.concurrent.utils.Sleeper.sleep; /** * Created by lilinchao * Date 2022/10/20 * Description 生产者和消费者示例 */ @Slf4j(topic = "c.Test04") public class Test04 { public static void main(String[] args) { //1.确定消息队列容量 MessageQueue queue = new MessageQueue(2); //2.生产者线程 for (int i = 0; i < 3; i++) { int id=i; new Thread(()->{ try { queue.put(new Message(id,"值"+id)); } catch (InterruptedException e) { e.printStackTrace(); } },"生产者"+i).start(); } //3.消费者线程 new Thread(()->{ //3.1每隔1s取一条消息 while(true){ try { sleep(2); Message message = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者").start(); } } /** * 消息队列类,java线程之间通信 */ @Slf4j(topic = "c.MessageQueue") class MessageQueue{ //1.消息队列集合 private LinkedList<Message> list=new LinkedList<>(); //2.规定容量 private int capcity; //3.通过构造方法规定消息队列容量 public MessageQueue(int capcity) { this.capcity = capcity; } //4.获取消息 public Message take() throws InterruptedException { synchronized (list) { //4.1判断队列是否为空 while (list.isEmpty()) { log.debug("队列为空,消费者线程等待"); list.wait(); } //4.2取出队列中头元素并且返回 Message message = list.removeFirst(); log.debug("已消费一个信息"); //4.3元素取出一个后需要告诉元素-1 list.notifyAll(); return message; } } //5.存入消息 public void put(Message message) throws InterruptedException { synchronized (list){ //5.1检查队列容量是否满? while(list.size()==capcity){ log.debug("队列已满,生产者线程等待"); list.wait(); } //5.2将消息加入队列尾部 list.addLast(message); log.debug("已生产一个信息"); list.notifyAll(); } } } /** * 邮件类,里面放邮件内容及对应id * ——>为了实现安全,不能set(只能通过构造方法赋初值)只能get */ final class Message{ private int id; private Object value; public Message(int id, Object value) { this.id = id; this.value = value; } public int getId() { return id; } public Object getValue() { return value; } @Override public String toString() { return "Message{" + "id=" + id + ", value=" + value + '}'; } }
运行结果
00:23:23.322 c.MessageQueue [生产者0] - 已生产一个信息 00:23:23.324 c.MessageQueue [生产者2] - 已生产一个信息 00:23:23.324 c.MessageQueue [生产者1] - 队列已满,生产者线程等待 00:23:25.321 c.MessageQueue [消费者] - 已消费一个信息 00:23:25.321 c.MessageQueue [生产者1] - 已生产一个信息 00:23:27.322 c.MessageQueue [消费者] - 已消费一个信息 00:23:29.323 c.MessageQueue [消费者] - 已消费一个信息 00:23:31.323 c.MessageQueue [消费者] - 队列为空,消费者线程等待