同步模式之保护性暂停
1. 定义
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
2. 简单版 GuardedObject
/** * @author lxy * @version 1.0 * @Description 一个线程等待另一个线程的执行结果 * @date 2022/6/23 15:13 */ @Slf4j(topic = "c.Test13") public class Test13 { //线程1 等待 线程2 的下载结果 public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(()->{ log.debug("等待结果"); List <String> list = (List <String>) guardedObject.get(); log.debug("结果大小:{}",list.size()); },"t1").start(); new Thread(()->{ log.debug("执行下载"); try { List <String> list = Downloader.download(); guardedObject.create(list); } catch (IOException e) { e.printStackTrace(); } },"t2").start(); } } class GuardedObject{ //结果 private Object response; //获取结果 public Object get(){ synchronized (this){ while (response == null){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } //产生结果 public void create(Object response){ synchronized (this){ //给结果成员变量赋值 this.response = response; this.notifyAll(); } } }
3. 带超时版 GuardedObject
/** * @author lxy * @version 1.0 * @Description 带超时版 GuardedObject * @date 2022/6/24 11:37 */ @Slf4j(topic = "c.Test14") public class Test14 { //线程 1 等待 线程2的下载结束 public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(()->{ log.debug("begin"); Object response = guardedObject.get(2000); log.debug("结果是{}",response); },"t1").start(); new Thread(()->{ log.debug("begin"); // 情况一 Sleeper.sleep(1); guardedObject.create(new Object()); //情况二 // Sleeper.sleep(3); // guardedObject.create(new Object()); // 情况三 // Sleeper.sleep(1); // guardedObject.create(null); },"t2").start(); } } class GuardedObject{ //结果 private Object response; //获取结果 public Object get(long timeout){ synchronized (this){ //开始时间 long begin = System.currentTimeMillis(); //经历的时间 long passedTime = 0; while (response == null){ long waitTime = timeout - passedTime; //经历的时间超过了最大等待时间,退出循环 if(waitTime <= 0){ break; } try { this.wait(waitTime);//为了防止虚假唤醒,等待时间为waitTime } catch (InterruptedException e) { e.printStackTrace(); } //求经历时间 passedTime = System.currentTimeMillis()-begin; } return response; } } //产生结果 public void create(Object response){ synchronized (this){ //给结果成员变量赋值 this.response = response; this.notifyAll(); } } }
*. 原理之 join
4. 多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右
侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,
这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
新增 id 用来标识 Guarded Object
class GuardedObject{ private int id; //... }
中间解耦类
/** * * 注意:这里的MailBoxes选择了静态类而不是选择单例,为什么? * 因为MailBoxes就相当于工具类呀!!!!这就是个处理读写共享资源的工具类,为了解藕而这么写的,和MVC架构差不多 */ //邮件类 class Mailboxes{ //由于boxes是线程安全的,所以多线程对其操作也不再使用synchronized修饰 private static Map<Integer, GuardedObject> boxes = new Hashtable <>(); //表示信件id private static int id = 1; //产生唯一id private static synchronized int generateId(){ return id++; } //根据id获取邮件 public static GuardedObject getGuardedObject(int id){ return boxes.remove(id); } //将一个邮件放在邮箱中 public static GuardedObject createGuardedObject(){ GuardedObject go = new GuardedObject(generateId()); boxes.put(go.getId(),go); return go; } //获得所有待送邮件的id public static Set<Integer> getIds(){ return boxes.keySet(); } }
业务相关类
//邮差类 @Slf4j(topic = "c.Postman") class Postman extends Thread{ private int id; private String mail;//信件内容 public Postman(int id, String mail) { this.id = id; this.mail = mail; } @Override public void run() { GuardedObject guardedObject = Mailboxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}",id,mail); guardedObject.create(mail); } } //居民类 @Slf4j(topic = "c.People") class People extends Thread{ @Override public void run() { //收信 GuardedObject guardedObject = Mailboxes.createGuardedObject();//创建信件 log.debug("开始收信 id:{}",guardedObject.getId()); Object mail = guardedObject.get(5000);//进行收件 log.debug("收到信 id:{},内容:{}",guardedObject.getId(),mail); } }
测试:
@Slf4j(topic = "c.Test15") public class Test15 { public static void main(String[] args) { for (int i = 0; i < 3; i++) { new People().start();//初始化信件和用户线程,有多少个用户线程就对应着有多少信件 } Sleeper.sleep(1); for (Integer id : Mailboxes.getIds()) { new Postman(id,"内容"+id).start();//邮递员开始根据id将信送入信箱 } } }
运行结果:
同步模式之顺序控制
1. 固定运行顺序
比如,必须先t2线程打印 2 后 t1线程再 打印1。
1.1 wait notify 版
/** * @author lxy * @version 1.0 * @Description 固定线程运行顺序:先让线程2执行,再线程1执行 * @date 2022/7/5 15:46 */ @Slf4j(topic = "c.Test21") public class Test21 { static final Object lock = new Object(); //表示t2是否运行过 static boolean t2runned = false; public static void main(String[] args) { Thread t1 = new Thread(() -> { synchronized (lock) { while (!t2runned) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("1"); } }, "t1"); Thread t2 = new Thread(() -> { synchronized (lock) { log.debug("2"); t2runned = true; lock.notify(); } }, "t2"); t1.start(); t2.start(); } }
输出:
15:39:39.232 c.Test21 [t2] - 2 15:39:39.234 c.Test21 [t1] - 1
1.2 await signal 版
/** * @author lxy * @version 1.0 * @Description 固定线程运行顺序:先让线程2执行,再线程1执行 * @date 2022/7/5 15:46 */ @Slf4j(topic = "c.Test22") public class Test22 { static ReentrantLock ROOM = new ReentrantLock(); static Condition waitSet = ROOM.newCondition(); //表示t2是否运行过 static boolean t2runned = false; public static void main(String[] args) { Thread t1 = new Thread(() -> { ROOM.lock(); try { while (!t2runned) { try { waitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("1"); } finally { ROOM.unlock(); } }, "t1"); Thread t2 = new Thread(() -> { ROOM.lock(); try { log.debug("2"); t2runned = true; waitSet.signal(); } finally { ROOM.unlock(); } }, "t2"); t1.start(); t2.start(); } }
1.3 Park Unpark版本
可以看到,实现上很麻烦:
首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题
最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个
可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:
@Slf4j(topic = "c.Test23") public class Test23 { public static void main(String[] args) { Thread t1 = new Thread(() -> { LockSupport.park();//阻塞t1线程 log.debug("1"); }, "t1"); t1.start(); new Thread(()->{ log.debug("2"); LockSupport.unpark(t1);//让t1开始运行 },"t2").start(); } }
2. 交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
2.1 wait notify 版
/** * @author lxy * @version 1.0 * @Description * @date 2022/7/5 17:19 */ public class Test24 { public static void main(String[] args) { WaitNotify wn = new WaitNotify(1, 5);//定义公共锁对象 new Thread(()->{ wn.print("a",1,2); },"t1").start(); new Thread(()->{ wn.print("b",2,3); },"t2").start(); new Thread(()->{ wn.print("c",3,1); },"t3").start(); } } /** * 输出内容 等待标记 下一个标记 * a 1 2 * b 2 3 * c 3 1 * * 关键点:使用waitFlag,来决定本线程是等待还是继续向运行。使用nextFlag来让哪个线程放弃等待,继续运行 * */ class WaitNotify{ private int flag; private int loopNumber; public WaitNotify(int flag, int loopNumber) { this.flag = flag; this.loopNumber = loopNumber; } //打印 public void print(String str,int waitFlag,int nextFlag){ for (int i = 0; i < 5; i++) {//连续5轮 synchronized (this){ while (flag != waitFlag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); flag = nextFlag;//更新标记 this.notifyAll();//唤醒阻塞的线程,让符合的进行执行 } } } }
2.2 Lock 条件变量版
/** * @author lxy * @version 1.0 * @Description * @date 2022/7/5 17:55 */ public class Test25 { public static void main(String[] args) { AwaitSignal awaitSignal = new AwaitSignal(5); Condition a = awaitSignal.newCondition(); Condition b = awaitSignal.newCondition(); Condition c = awaitSignal.newCondition(); new Thread(()->{ awaitSignal.print("a",a,b); },"t1").start(); new Thread(()->{ awaitSignal.print("b",b,c); },"t2").start(); new Thread(()->{ awaitSignal.print("c",c,a); },"t3").start(); Sleeper.sleep(1); awaitSignal.lock(); try { System.out.println("开始打印..."); a.signal(); }finally { awaitSignal.unlock(); } } } class AwaitSignal extends ReentrantLock{ private int loopNumber; public AwaitSignal(int loopNumber){ this.loopNumber = loopNumber; } //打印数据。 current:当前线程要进入哪一间休息室。next:下一间休息室 public void print(String str, Condition current,Condition next){ for (int i = 0; i < 5; i++) { lock(); try { current.await(); System.out.print(str); next.signal();//敲醒另外一个线程,打印数据 } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } } }
输出:
开始打印... abcabcabcabcabc
2.3 Park Unpark 版
/** * @author lxy * @version 1.0 * @Description * @date 2022/7/5 18:26 */ @Slf4j(topic = "c.Test26") public class Test26 { static Thread t1,t2,t3; public static void main(String[] args) { ParkUnpark pu = new ParkUnpark(5); t1 = new Thread(() -> { pu.print("a", t2); }); t2 = new Thread(() -> { pu.print("b", t3); }); t3 = new Thread(() -> { pu.print("c", t1); }); t1.start(); t2.start(); t3.start(); LockSupport.unpark(t1);//唤醒t1 } } class ParkUnpark{ private int loopNumber; public ParkUnpark(int loopNumber){ this.loopNumber = loopNumber; } public void print(String str,Thread next){ for (int i = 0; i < loopNumber; i++) { LockSupport.park();//当前线程等待 System.out.print(str); LockSupport.unpark(next);//唤醒下一个线程 } } }