保护性暂停模式

简介: 保护性暂停模式

一、基本概念

保护性暂停(Guarded Suspension):用在一个线程等待另一个线程的执行结果时使用。

保护性暂停的暂停就是当条件不满足的时候就去进行wait等待。

要点

  • 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么此时就不能使用这个保护性暂停模式了,可以使用消息队列(见生产者/消费者)。
  • JDK中,join的实现、Future的实现,采用的就是此模式。(用join一个线程等待另一个线程结束就可以拿到结果了,其实这也是保护性线程的一个应用)
  • 因为要等待另一方的结果,因此归类到同步模式。(关于同步模式的顺序控制实现见文章:同步模式之顺序控制

线程2产生这个结果,然后线程1想要得到这个结果,那就可以让GuardedObject充当一个桥梁,让线程1、2都关联到这个对象上。

二、单任务版GuardedObject

示例代码

t1 等待 t2线程的下载结果

import site.weiyikai.concurrent.utils.Downloader;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
/**
 * Created by lilinchao
 * Date 2022/10/20
 * Description 单任务版GuardedObject (t1 等待 t2线程的下载结果)
 */
@Slf4j(topic = "c.GuardedTest01")
public class GuardedTest01 {
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            // 等待结果
            log.debug("等待结果");
            List<String> list = (List<String>) guardedObject.get();
            log.debug("结果大小:{}", list.size());
        }, "t1").start();
        new Thread(() -> {
            log.debug("执行下载");
            try {
                //读取下载来的数据到List集合当中
                List<String> list = Downloader.download();
                //产生结果
                guardedObject.complete(list);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }
}
/**
 * wait/notifyAll使用
 */
class GuardedObject {
    // 结果
    private Object response;
    private final Object lock = new Object();
    // 获取结果(没有结果将处于一直等待状态,等待结果的产生)
    public Object get() {
        synchronized (lock) {
            // 没有结果则一直等待
            // 防止虚假唤醒
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }
    // 产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 给成员变量赋值
            this.response = response;
            // 产生结果,通知等待线程
            lock.notifyAll();
        }
    }
}
  • Downloader代码
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class Downloader {
    public static List<String> download() throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader =
                     new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                lines.add(line);
            }
        }
        return lines;
    }
}

运行结果

23:45:31.032 c.GuardedTest01 [t2] - 执行下载
23:45:31.032 c.GuardedTest01 [t1] - 等待结果
23:45:32.409 c.GuardedTest01 [t1] - 结果大小:3

升级代码,设置超时时间

如果超过时间还没返回结果,此时就不等了,退出while循环

import site.weiyikai.concurrent.utils.Sleeper;
import lombok.extern.slf4j.Slf4j;
/**
 * Created by lilinchao
 * Date 2022/10/20
 * Description 带超时时间的阻塞获取结果
 */
@Slf4j(topic = "c.GuardedTest02")
public class GuardedTest02 {
    public static void main(String[] args) {
        // 线程1等待线程2的下载结果
        GuardedObjectTime guardeObject = new GuardedObjectTime();
        new Thread(() -> {
            log.debug("begin");
            Object obj = guardeObject.get(2000);
            log.debug("结果是:{}", obj);
        }, "t1").start();
        new Thread(() -> {
            log.debug("begin");
            Sleeper.sleep(1); // 在等待时间内
//            Sleeper.sleep(3);//  超时的情况
            guardeObject.complete(new Object());
        }, "t2").start();
    }
}
class GuardedObjectTime {
    // 结果
    private Object response;
    // 获取结果
    // timeout表示等待多久
    public Object get(long timeout) {
        synchronized (this) {
            // 假如开始时间为 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间(假设 timeout 是 1000,结果在 400 时被唤醒了,那么还有 600 要等)
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间, 退出循环
                if (waitTime <= 0) {
                    break;
                }
                try {
                    // 等待的时间应该 超时时间(timeout) - 经历的时间(passedTime)
                    this.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02
            }
            return response;
        }
    }
    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}
  • 在等待时间内的运行情况
23:50:19.739 c.GuardedTest02 [t1] - begin
23:50:19.739 c.GuardedTest02 [t2] - begin
23:50:20.745 c.GuardedTest02 [t1] - 结果是:java.lang.Object@2e10de1
  • 超时的情况
23:52:16.857 c.GuardedTest02 [t1] - begin
23:52:16.857 c.GuardedTest02 [t2] - begin
23:52:18.861 c.GuardedTest02 [t1] - 结果是:null

三、原理之join

t.join( )方法阻塞调用此方法的线程(calling thread)进入 TIMED_WAITING 状态,直到线程 t 执行完成,此线程再继续


通常用于在main( )主线程内,等待其它线程完成再结束main( )主线程

  • 调用者轮询检查线程 alive 状态
t1.join();
  • 等价于下面的代码
synchronized (t1) {
  // 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
  // 此处t1线程对象作为了锁
  while (t1.isAlive()) {
    // 调用线程进了锁t1的waitSet
    // 注意,调用线程不是t1,t1此处是作为锁而不是作为线程
    // 调用线程是其他线程,一般是主线程
    t1.wait(0);
  }
}

join源码

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
                        // 和上面的超时增强原理一样
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • 当millis==0时,会进入while( isAlive( ) )循环;即只要子线程是活的,主线程就不停的等待。
  • wait( )的作用是让“当前线程”等待,而这里的“当前线程”是指当前运行的线程。虽然是调用子线程的wait( )方法,但是它是通过“主线程”去调用的;所以,休眠的是主线程,而不是“子线程”!

这样理解: 例子中的Thread t只是一个对象 , isAlive( )判断当前对象(例子中的t对象)是否存活, wait()阻塞的是当前执行的线程(一般是main方法)

可以看出,Join方法实现是通过wait( )。 当main线程调用t.join时候,main线程会获得线程对象t的锁(wait 意味着拿到该对象的锁),调用该对象的wait( ),直到该对象唤醒main线程 ,比如退出后。这就意味着main 线程调用t.join时,必须能够拿到线程t对象的锁。

四、多任务版 GuardedObject

  • 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类。
  • 不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个产生结果的线程和使用结果的线程是一一对应的关系,但是生产者消费者模式并不是
  • rpc框架的调用中就使用到了这种模式。

示例代码

送信收信案例

import site.weiyikai.concurrent.utils.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
/**
 * 多任务版 GuardedObject
 */
@Slf4j(topic = "c.GuardedTest03")
public class GuardedTest03 {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        Sleeper.sleep(1);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "内容" + id).start();
        }
    }
}
@Slf4j(topic = "c.People")
class People extends Thread{
    @Override
    public void run() {
        // 收信
        GuardedObject03 guardedObject = Mailboxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread {
    private int id;
    private String mail;
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject03 guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObject.complete(mail);
    }
}
class Mailboxes {
    private static Map<Integer, GuardedObject03> boxes = new Hashtable<>();
    private static int id = 1;
    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    // 根据id得到唯一的GuardedObject,用完给到收信人得移除GuardedObject
    public static GuardedObject03 getGuardedObject(int id) {
        return boxes.remove(id);
    }
    // 产生GuardedObject
    public static GuardedObject03 createGuardedObject() {
        GuardedObject03 go = new GuardedObject03(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    // 得到map中的所有键,也就是id
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}
// 增加超时效果
class GuardedObject03 {
    // 标识 Guarded Object
    private int id;
    public GuardedObject03(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    // 结果
    private Object response;
    // 获取结果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 开始时间 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虚假唤醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02  1s
            }
            return response;
        }
    }
    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

运行结果

00:13:15.571 c.People [Thread-1] - 开始收信 id:1
00:13:15.571 c.People [Thread-2] - 开始收信 id:2
00:13:15.571 c.People [Thread-0] - 开始收信 id:3
00:13:16.570 c.Postman [Thread-3] - 送信 id:3, 内容:内容3
00:13:16.570 c.People [Thread-0] - 收到信 id:3, 内容:内容3
00:13:16.570 c.Postman [Thread-4] - 送信 id:2, 内容:内容2
00:13:16.571 c.People [Thread-2] - 收到信 id:2, 内容:内容2
00:13:16.571 c.Postman [Thread-5] - 送信 id:1, 内容:内容1
00:13:16.571 c.People [Thread-1] - 收到信 id:1, 内容:内容1

五、异步模式之生产者/消费者

5.1 定义

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应(一个生产一个消费)
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

与保护性暂停的共同点:都是在多个线程之间进行数据传输;

异步模式中, 生产者产生消息之后消息没有被立刻消费
同步模式中, 消息在产生之后被立刻消费了。

示例

线程间通信的消息队列


注:只是简单实现,功能很基础,了解核心原理即刻

import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import static site.weiyikai.concurrent.utils.Sleeper.sleep;
/**
 * Created by lilinchao
 * Date 2022/10/20
 * Description 生产者和消费者示例
 */
@Slf4j(topic = "c.Test04")
public class Test04 {
    public static void main(String[] args) {
        //1.确定消息队列容量
        MessageQueue queue = new MessageQueue(2);
        //2.生产者线程
        for (int i = 0; i < 3; i++) {
            int id=i;
            new Thread(()->{
                try {
                    queue.put(new Message(id,"值"+id));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"生产者"+i).start();
        }
        //3.消费者线程
        new Thread(()->{
            //3.1每隔1s取一条消息
            while(true){
                try {
                    sleep(2);
                    Message message = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者").start();
    }
}
/**
 * 消息队列类,java线程之间通信
 */
@Slf4j(topic = "c.MessageQueue")
class MessageQueue{
    //1.消息队列集合
    private LinkedList<Message> list=new LinkedList<>();
    //2.规定容量
    private int capcity;
    //3.通过构造方法规定消息队列容量
    public MessageQueue(int capcity) {
        this.capcity = capcity;
    }
    //4.获取消息
    public Message take() throws InterruptedException {
        synchronized (list) {
            //4.1判断队列是否为空
            while (list.isEmpty()) {
                log.debug("队列为空,消费者线程等待");
                list.wait();
            }
            //4.2取出队列中头元素并且返回
            Message message = list.removeFirst();
            log.debug("已消费一个信息");
            //4.3元素取出一个后需要告诉元素-1
            list.notifyAll();
            return message;
        }
    }
    //5.存入消息
    public void put(Message message) throws InterruptedException {
        synchronized (list){
            //5.1检查队列容量是否满?
            while(list.size()==capcity){
                log.debug("队列已满,生产者线程等待");
                list.wait();
            }
            //5.2将消息加入队列尾部
            list.addLast(message);
            log.debug("已生产一个信息");
            list.notifyAll();
        }
    }
}
/**
 * 邮件类,里面放邮件内容及对应id
 * ——>为了实现安全,不能set(只能通过构造方法赋初值)只能get
 */
final class Message{
    private int id;
    private Object value;
    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }
    public int getId() {
        return id;
    }
    public Object getValue() {
        return value;
    }
    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }
}

运行结果

00:23:23.322 c.MessageQueue [生产者0] - 已生产一个信息
00:23:23.324 c.MessageQueue [生产者2] - 已生产一个信息
00:23:23.324 c.MessageQueue [生产者1] - 队列已满,生产者线程等待
00:23:25.321 c.MessageQueue [消费者] - 已消费一个信息
00:23:25.321 c.MessageQueue [生产者1] - 已生产一个信息
00:23:27.322 c.MessageQueue [消费者] - 已消费一个信息
00:23:29.323 c.MessageQueue [消费者] - 已消费一个信息
00:23:31.323 c.MessageQueue [消费者] - 队列为空,消费者线程等待
目录
相关文章
|
8月前
|
存储 算法 安全
【软件设计师备考 专题 】计算机管理(状态转换、共享与互斥、分时轮转、抢占、死锁)
【软件设计师备考 专题 】计算机管理(状态转换、共享与互斥、分时轮转、抢占、死锁)
80 0
|
C#
如何解决在PotPlayer中看视频音画不同步的问题(C#视频可用)
如何解决在PotPlayer中看视频音画不同步的问题(C#视频可用)
1060 0
|
6月前
|
安全
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决
|
消息中间件 Java
同步模式之保护性暂停
同步模式之保护性暂停
|
8月前
在程序运行过程中,线程的状态是什么?进来看看就通透了
在程序运行过程中,线程的状态是什么?进来看看就通透了
60 0
|
Java 程序员
同步模式之顺序控制线程执行
同步模式是指在多线程编程中,为了保证线程之间的协作和正确性,需要对线程的执行顺序进行控制。顺序控制线程执行是一种同步模式,它通过控制线程的等待和唤醒来实现线程的有序执行。
149 0
同步模式之顺序控制线程执行
|
Linux 测试技术 调度
Linux调度器何时需触发抢占?—— 从hackbench谈起
作者:何惟禹 吴一昊一、背景:性能之战“不服跑个分”虽然已经沦为手机行业的调侃用语,但在操作系统领域仍然是最重要的评价方式之一。本文的故事也源于一次 Alinux3 与 CentOS8 的一次跑分的较量。当然比分较量并不是目的,更重要的是发现存在的回归缺陷并进行修复,最终让 Alinux3 全方位持平或超过 CentOS8。在本次较量中,我们使用 hackbench 作为跑分软件,我们在测试过程中
2637 0
Linux调度器何时需触发抢占?—— 从hackbench谈起
|
消息中间件 Cloud Native Java
线程同步模式之保护性暂停
保护性暂停是一种同步模式,用于保护共享资源的完整性。在多线程或多进程环境中,如果多个线程或进程同时访问共享资源,可能会导致数据不一致或者竞态条件等问题
178 0
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
107 0
MOTOROLA TMCP700 在任何时候暂停或中止数据阶段
|
消息中间件 Java
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(一)
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(一)

热门文章

最新文章

下一篇
开通oss服务