《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(一)

简介: 《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式

同步模式之保护性暂停

1. 定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

a87126c8b5ec00706ef2808bd20f7d5c.png

2. 简单版 GuardedObject

/**
 * @author lxy
 * @version 1.0
 * @Description 一个线程等待另一个线程的执行结果
 * @date 2022/6/23 15:13
 */
@Slf4j(topic = "c.Test13")
public class Test13 {
    //线程1 等待 线程2 的下载结果
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(()->{
            log.debug("等待结果");
            List <String>  list = (List <String>) guardedObject.get();
            log.debug("结果大小:{}",list.size());
        },"t1").start();
        new Thread(()->{
            log.debug("执行下载");
            try {
                List <String> list = Downloader.download();
                guardedObject.create(list);
            } catch (IOException e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}
class GuardedObject{
    //结果
    private Object response;
    //获取结果
    public Object get(){
        synchronized (this){
            while (response == null){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }
    //产生结果
    public void create(Object response){
        synchronized (this){
            //给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

765fc0c0d6e79e155c7e03645ae8cffb.png

3. 带超时版 GuardedObject

/**
 * @author lxy
 * @version 1.0
 * @Description 带超时版 GuardedObject
 * @date 2022/6/24 11:37
 */
@Slf4j(topic = "c.Test14")
public class Test14 {
    //线程 1 等待 线程2的下载结束
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(()->{
            log.debug("begin");
            Object response = guardedObject.get(2000);
            log.debug("结果是{}",response);
        },"t1").start();
        new Thread(()->{
            log.debug("begin");
           // 情况一
            Sleeper.sleep(1);
            guardedObject.create(new Object());
            //情况二
            // Sleeper.sleep(3);
            // guardedObject.create(new Object());
            // 情况三
            // Sleeper.sleep(1);
            // guardedObject.create(null);
        },"t2").start();
    }
}
class GuardedObject{
    //结果
    private Object response;
    //获取结果
    public Object get(long timeout){
        synchronized (this){
            //开始时间
            long begin = System.currentTimeMillis();
            //经历的时间
            long passedTime = 0;
            while (response == null){
                long waitTime = timeout - passedTime;
                //经历的时间超过了最大等待时间,退出循环
                if(waitTime <= 0){
                    break;
                }
                try {
                    this.wait(waitTime);//为了防止虚假唤醒,等待时间为waitTime
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //求经历时间
                passedTime = System.currentTimeMillis()-begin;
            }
            return response;
        }
    }
    //产生结果
    public void create(Object response){
        synchronized (this){
            //给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

e2006e1c56f805b43195c265a94325d4.png

25cb6f5e7a4cbb37e8f76d4322ee0a3a.png

0e4ebcffffd1dabf4b614b8b7d4c04d8.png

*. 原理之 join

4. 多任务版 GuardedObject


图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右

侧的 t1,t3,t5 就好比邮递员

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,

这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理


337974d058be3615cc0a274e7aff7282.png

新增 id 用来标识 Guarded Object

class GuardedObject{
    private int id;
    //...
}


中间解耦类

/**
 *
 * 注意:这里的MailBoxes选择了静态类而不是选择单例,为什么?
 *   因为MailBoxes就相当于工具类呀!!!!这就是个处理读写共享资源的工具类,为了解藕而这么写的,和MVC架构差不多
 */
//邮件类
class Mailboxes{
    //由于boxes是线程安全的,所以多线程对其操作也不再使用synchronized修饰
    private static Map<Integer, GuardedObject> boxes = new Hashtable <>();
    //表示信件id
    private static int id = 1;
    //产生唯一id
    private static synchronized int generateId(){
        return id++;
    }
    //根据id获取邮件
    public static GuardedObject getGuardedObject(int id){
        return boxes.remove(id);
    }
    //将一个邮件放在邮箱中
    public static GuardedObject createGuardedObject(){
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(),go);
        return go;
    }
    //获得所有待送邮件的id
    public static Set<Integer> getIds(){
        return boxes.keySet();
    }
}

业务相关类

//邮差类
@Slf4j(topic = "c.Postman")
class Postman extends Thread{
    private int id;
    private String mail;//信件内容
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}",id,mail);
        guardedObject.create(mail);
    }
}
//居民类
@Slf4j(topic = "c.People")
class People extends Thread{
    @Override
    public void run() {
        //收信
        GuardedObject guardedObject = Mailboxes.createGuardedObject();//创建信件
        log.debug("开始收信 id:{}",guardedObject.getId());
        Object mail = guardedObject.get(5000);//进行收件
        log.debug("收到信 id:{},内容:{}",guardedObject.getId(),mail);
    }
}

测试:

@Slf4j(topic = "c.Test15")
public class Test15 {
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new People().start();//初始化信件和用户线程,有多少个用户线程就对应着有多少信件
        }
        Sleeper.sleep(1);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id,"内容"+id).start();//邮递员开始根据id将信送入信箱
        }
    }
}

运行结果:

ed60835e645893d2ddce2700396baf0c.png

同步模式之顺序控制

1. 固定运行顺序

比如,必须先t2线程打印 2 后 t1线程再 打印1。

1.1 wait notify 版

/**
 * @author lxy
 * @version 1.0
 * @Description  固定线程运行顺序:先让线程2执行,再线程1执行
 * @date 2022/7/5 15:46
 */
@Slf4j(topic = "c.Test21")
public class Test21 {
    static final Object lock = new Object();
    //表示t2是否运行过
    static boolean t2runned = false;
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock) {
                while (!t2runned) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("1");
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            synchronized (lock) {
                log.debug("2");
                t2runned = true;
                lock.notify();
            }
        }, "t2");
        t1.start();
        t2.start();
    }
}

输出:

15:39:39.232 c.Test21 [t2] - 2
15:39:39.234 c.Test21 [t1] - 1

1.2 await signal 版

/**
 * @author lxy
 * @version 1.0
 * @Description  固定线程运行顺序:先让线程2执行,再线程1执行
 * @date 2022/7/5 15:46
 */
@Slf4j(topic = "c.Test22")
public class Test22 {
    static ReentrantLock ROOM = new ReentrantLock();
    static Condition waitSet = ROOM.newCondition();
    //表示t2是否运行过
    static boolean t2runned = false;
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            ROOM.lock();
            try {
                while (!t2runned) {
                    try {
                        waitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("1");
            } finally {
                ROOM.unlock();
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            ROOM.lock();
            try {
                log.debug("2");
                t2runned = true;
                waitSet.signal();
            } finally {
                ROOM.unlock();
            }
        }, "t2");
        t1.start();
        t2.start();
    }
}


1.3 Park Unpark版本


可以看到,实现上很麻烦:


首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait

第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题

最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个

可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:


@Slf4j(topic = "c.Test23")
public class Test23 {
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            LockSupport.park();//阻塞t1线程
            log.debug("1");
        }, "t1");
        t1.start();
        new Thread(()->{
            log.debug("2");
            LockSupport.unpark(t1);//让t1开始运行
        },"t2").start();
    }
}

2. 交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

2.1 wait notify 版

/**
 * @author lxy
 * @version 1.0
 * @Description 
 * @date 2022/7/5 17:19
 */
public class Test24 {
    public static void main(String[] args) {
        WaitNotify wn = new WaitNotify(1, 5);//定义公共锁对象
        new Thread(()->{
            wn.print("a",1,2);
        },"t1").start();
        new Thread(()->{
            wn.print("b",2,3);
        },"t2").start();
        new Thread(()->{
            wn.print("c",3,1);
        },"t3").start();
    }
}
/**
 * 输出内容   等待标记  下一个标记
 *    a         1       2
 *    b         2       3
 *    c         3       1
 *
 * 关键点:使用waitFlag,来决定本线程是等待还是继续向运行。使用nextFlag来让哪个线程放弃等待,继续运行
 *
 */
class WaitNotify{
    private int flag;
    private int loopNumber;
    public WaitNotify(int flag, int loopNumber) {
        this.flag = flag;
        this.loopNumber = loopNumber;
    }
    //打印
    public void print(String str,int waitFlag,int nextFlag){
        for (int i = 0; i < 5; i++) {//连续5轮
            synchronized (this){
                while (flag != waitFlag){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.print(str);
                flag = nextFlag;//更新标记
                this.notifyAll();//唤醒阻塞的线程,让符合的进行执行
            }
        }
    }
}

2.2 Lock 条件变量版

/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/7/5 17:55
 */
public class Test25 {
    public static void main(String[] args) {
        AwaitSignal awaitSignal = new AwaitSignal(5);
        Condition a = awaitSignal.newCondition();
        Condition b = awaitSignal.newCondition();
        Condition c = awaitSignal.newCondition();
        new Thread(()->{
            awaitSignal.print("a",a,b);
        },"t1").start();
        new Thread(()->{
            awaitSignal.print("b",b,c);
        },"t2").start();
        new Thread(()->{
            awaitSignal.print("c",c,a);
        },"t3").start();
        Sleeper.sleep(1);
        awaitSignal.lock();
        try {
            System.out.println("开始打印...");
            a.signal();
        }finally {
            awaitSignal.unlock();
        }
    }
}
class AwaitSignal extends ReentrantLock{
    private int loopNumber;
    public AwaitSignal(int loopNumber){
        this.loopNumber = loopNumber;
    }
    //打印数据。  current:当前线程要进入哪一间休息室。next:下一间休息室
    public void print(String str, Condition current,Condition next){
        for (int i = 0; i < 5; i++) {
            lock();
            try {
                current.await();
                System.out.print(str);
                next.signal();//敲醒另外一个线程,打印数据
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                unlock();
            }
        }
    }
}

输出:

开始打印...
abcabcabcabcabc

2.3 Park Unpark 版

/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/7/5 18:26
 */
@Slf4j(topic = "c.Test26")
public class Test26 {
    static Thread t1,t2,t3;
    public static void main(String[] args) {
        ParkUnpark pu = new ParkUnpark(5);
        t1 = new Thread(() -> {
            pu.print("a", t2);
        });
        t2 = new Thread(() -> {
            pu.print("b", t3);
        });
        t3 = new Thread(() -> {
            pu.print("c", t1);
        });
        t1.start();
        t2.start();
        t3.start();
        LockSupport.unpark(t1);//唤醒t1
    }
}
class ParkUnpark{
    private int loopNumber;
    public ParkUnpark(int loopNumber){
        this.loopNumber = loopNumber;
    }
    public void print(String str,Thread next){
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();//当前线程等待
            System.out.print(str);
            LockSupport.unpark(next);//唤醒下一个线程
        }
    }
}
相关文章
|
5月前
|
机器学习/深度学习 Java 数据挖掘
线程操纵术之更优雅的并行策略问题之并发和并行有区别问题如何解决
线程操纵术之更优雅的并行策略问题之并发和并行有区别问题如何解决
|
5月前
|
安全
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决
线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决
|
5月前
|
存储 设计模式 监控
Java面试题:如何在不牺牲性能的前提下,实现一个线程安全的单例模式?如何在生产者-消费者模式中平衡生产和消费的速度?Java内存模型规定了变量在内存中的存储和线程间的交互规则
Java面试题:如何在不牺牲性能的前提下,实现一个线程安全的单例模式?如何在生产者-消费者模式中平衡生产和消费的速度?Java内存模型规定了变量在内存中的存储和线程间的交互规则
53 0
|
7月前
|
监控
两阶段终止模式和Balking模式
两阶段终止模式和Balking模式
57 1
架构系列——线程通信的实现方式
架构系列——线程通信的实现方式
|
Java 程序员
同步模式之顺序控制线程执行
同步模式是指在多线程编程中,为了保证线程之间的协作和正确性,需要对线程的执行顺序进行控制。顺序控制线程执行是一种同步模式,它通过控制线程的等待和唤醒来实现线程的有序执行。
145 0
同步模式之顺序控制线程执行
|
消息中间件 Cloud Native Java
线程同步模式之保护性暂停
保护性暂停是一种同步模式,用于保护共享资源的完整性。在多线程或多进程环境中,如果多个线程或进程同时访问共享资源,可能会导致数据不一致或者竞态条件等问题
175 0
生产者消费者模式
生产者消费者模式
|
缓存 监控 安全
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(二)
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(二)
|
设计模式 安全
【多线程:设计模式】保护性暂停的应用与扩展
【多线程:设计模式】保护性暂停的应用与扩展
226 0

热门文章

最新文章

相关实验场景

更多