《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(一)

简介: 《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式

同步模式之保护性暂停

1. 定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

a87126c8b5ec00706ef2808bd20f7d5c.png

2. 简单版 GuardedObject

/**
 * @author lxy
 * @version 1.0
 * @Description 一个线程等待另一个线程的执行结果
 * @date 2022/6/23 15:13
 */
@Slf4j(topic = "c.Test13")
public class Test13 {
    //线程1 等待 线程2 的下载结果
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(()->{
            log.debug("等待结果");
            List <String>  list = (List <String>) guardedObject.get();
            log.debug("结果大小:{}",list.size());
        },"t1").start();
        new Thread(()->{
            log.debug("执行下载");
            try {
                List <String> list = Downloader.download();
                guardedObject.create(list);
            } catch (IOException e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}
class GuardedObject{
    //结果
    private Object response;
    //获取结果
    public Object get(){
        synchronized (this){
            while (response == null){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }
    //产生结果
    public void create(Object response){
        synchronized (this){
            //给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

765fc0c0d6e79e155c7e03645ae8cffb.png

3. 带超时版 GuardedObject

/**
 * @author lxy
 * @version 1.0
 * @Description 带超时版 GuardedObject
 * @date 2022/6/24 11:37
 */
@Slf4j(topic = "c.Test14")
public class Test14 {
    //线程 1 等待 线程2的下载结束
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(()->{
            log.debug("begin");
            Object response = guardedObject.get(2000);
            log.debug("结果是{}",response);
        },"t1").start();
        new Thread(()->{
            log.debug("begin");
           // 情况一
            Sleeper.sleep(1);
            guardedObject.create(new Object());
            //情况二
            // Sleeper.sleep(3);
            // guardedObject.create(new Object());
            // 情况三
            // Sleeper.sleep(1);
            // guardedObject.create(null);
        },"t2").start();
    }
}
class GuardedObject{
    //结果
    private Object response;
    //获取结果
    public Object get(long timeout){
        synchronized (this){
            //开始时间
            long begin = System.currentTimeMillis();
            //经历的时间
            long passedTime = 0;
            while (response == null){
                long waitTime = timeout - passedTime;
                //经历的时间超过了最大等待时间,退出循环
                if(waitTime <= 0){
                    break;
                }
                try {
                    this.wait(waitTime);//为了防止虚假唤醒,等待时间为waitTime
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //求经历时间
                passedTime = System.currentTimeMillis()-begin;
            }
            return response;
        }
    }
    //产生结果
    public void create(Object response){
        synchronized (this){
            //给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

e2006e1c56f805b43195c265a94325d4.png

25cb6f5e7a4cbb37e8f76d4322ee0a3a.png

0e4ebcffffd1dabf4b614b8b7d4c04d8.png

*. 原理之 join

4. 多任务版 GuardedObject


图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右

侧的 t1,t3,t5 就好比邮递员

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,

这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理


337974d058be3615cc0a274e7aff7282.png

新增 id 用来标识 Guarded Object

class GuardedObject{
    private int id;
    //...
}


中间解耦类

/**
 *
 * 注意:这里的MailBoxes选择了静态类而不是选择单例,为什么?
 *   因为MailBoxes就相当于工具类呀!!!!这就是个处理读写共享资源的工具类,为了解藕而这么写的,和MVC架构差不多
 */
//邮件类
class Mailboxes{
    //由于boxes是线程安全的,所以多线程对其操作也不再使用synchronized修饰
    private static Map<Integer, GuardedObject> boxes = new Hashtable <>();
    //表示信件id
    private static int id = 1;
    //产生唯一id
    private static synchronized int generateId(){
        return id++;
    }
    //根据id获取邮件
    public static GuardedObject getGuardedObject(int id){
        return boxes.remove(id);
    }
    //将一个邮件放在邮箱中
    public static GuardedObject createGuardedObject(){
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(),go);
        return go;
    }
    //获得所有待送邮件的id
    public static Set<Integer> getIds(){
        return boxes.keySet();
    }
}

业务相关类

//邮差类
@Slf4j(topic = "c.Postman")
class Postman extends Thread{
    private int id;
    private String mail;//信件内容
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}",id,mail);
        guardedObject.create(mail);
    }
}
//居民类
@Slf4j(topic = "c.People")
class People extends Thread{
    @Override
    public void run() {
        //收信
        GuardedObject guardedObject = Mailboxes.createGuardedObject();//创建信件
        log.debug("开始收信 id:{}",guardedObject.getId());
        Object mail = guardedObject.get(5000);//进行收件
        log.debug("收到信 id:{},内容:{}",guardedObject.getId(),mail);
    }
}

测试:

@Slf4j(topic = "c.Test15")
public class Test15 {
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new People().start();//初始化信件和用户线程,有多少个用户线程就对应着有多少信件
        }
        Sleeper.sleep(1);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id,"内容"+id).start();//邮递员开始根据id将信送入信箱
        }
    }
}

运行结果:

ed60835e645893d2ddce2700396baf0c.png

同步模式之顺序控制

1. 固定运行顺序

比如,必须先t2线程打印 2 后 t1线程再 打印1。

1.1 wait notify 版

/**
 * @author lxy
 * @version 1.0
 * @Description  固定线程运行顺序:先让线程2执行,再线程1执行
 * @date 2022/7/5 15:46
 */
@Slf4j(topic = "c.Test21")
public class Test21 {
    static final Object lock = new Object();
    //表示t2是否运行过
    static boolean t2runned = false;
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock) {
                while (!t2runned) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("1");
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            synchronized (lock) {
                log.debug("2");
                t2runned = true;
                lock.notify();
            }
        }, "t2");
        t1.start();
        t2.start();
    }
}

输出:

15:39:39.232 c.Test21 [t2] - 2
15:39:39.234 c.Test21 [t1] - 1

1.2 await signal 版

/**
 * @author lxy
 * @version 1.0
 * @Description  固定线程运行顺序:先让线程2执行,再线程1执行
 * @date 2022/7/5 15:46
 */
@Slf4j(topic = "c.Test22")
public class Test22 {
    static ReentrantLock ROOM = new ReentrantLock();
    static Condition waitSet = ROOM.newCondition();
    //表示t2是否运行过
    static boolean t2runned = false;
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            ROOM.lock();
            try {
                while (!t2runned) {
                    try {
                        waitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("1");
            } finally {
                ROOM.unlock();
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            ROOM.lock();
            try {
                log.debug("2");
                t2runned = true;
                waitSet.signal();
            } finally {
                ROOM.unlock();
            }
        }, "t2");
        t1.start();
        t2.start();
    }
}


1.3 Park Unpark版本


可以看到,实现上很麻烦:


首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait

第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题

最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个

可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:


@Slf4j(topic = "c.Test23")
public class Test23 {
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            LockSupport.park();//阻塞t1线程
            log.debug("1");
        }, "t1");
        t1.start();
        new Thread(()->{
            log.debug("2");
            LockSupport.unpark(t1);//让t1开始运行
        },"t2").start();
    }
}

2. 交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

2.1 wait notify 版

/**
 * @author lxy
 * @version 1.0
 * @Description 
 * @date 2022/7/5 17:19
 */
public class Test24 {
    public static void main(String[] args) {
        WaitNotify wn = new WaitNotify(1, 5);//定义公共锁对象
        new Thread(()->{
            wn.print("a",1,2);
        },"t1").start();
        new Thread(()->{
            wn.print("b",2,3);
        },"t2").start();
        new Thread(()->{
            wn.print("c",3,1);
        },"t3").start();
    }
}
/**
 * 输出内容   等待标记  下一个标记
 *    a         1       2
 *    b         2       3
 *    c         3       1
 *
 * 关键点:使用waitFlag,来决定本线程是等待还是继续向运行。使用nextFlag来让哪个线程放弃等待,继续运行
 *
 */
class WaitNotify{
    private int flag;
    private int loopNumber;
    public WaitNotify(int flag, int loopNumber) {
        this.flag = flag;
        this.loopNumber = loopNumber;
    }
    //打印
    public void print(String str,int waitFlag,int nextFlag){
        for (int i = 0; i < 5; i++) {//连续5轮
            synchronized (this){
                while (flag != waitFlag){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.print(str);
                flag = nextFlag;//更新标记
                this.notifyAll();//唤醒阻塞的线程,让符合的进行执行
            }
        }
    }
}

2.2 Lock 条件变量版

/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/7/5 17:55
 */
public class Test25 {
    public static void main(String[] args) {
        AwaitSignal awaitSignal = new AwaitSignal(5);
        Condition a = awaitSignal.newCondition();
        Condition b = awaitSignal.newCondition();
        Condition c = awaitSignal.newCondition();
        new Thread(()->{
            awaitSignal.print("a",a,b);
        },"t1").start();
        new Thread(()->{
            awaitSignal.print("b",b,c);
        },"t2").start();
        new Thread(()->{
            awaitSignal.print("c",c,a);
        },"t3").start();
        Sleeper.sleep(1);
        awaitSignal.lock();
        try {
            System.out.println("开始打印...");
            a.signal();
        }finally {
            awaitSignal.unlock();
        }
    }
}
class AwaitSignal extends ReentrantLock{
    private int loopNumber;
    public AwaitSignal(int loopNumber){
        this.loopNumber = loopNumber;
    }
    //打印数据。  current:当前线程要进入哪一间休息室。next:下一间休息室
    public void print(String str, Condition current,Condition next){
        for (int i = 0; i < 5; i++) {
            lock();
            try {
                current.await();
                System.out.print(str);
                next.signal();//敲醒另外一个线程,打印数据
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                unlock();
            }
        }
    }
}

输出:

开始打印...
abcabcabcabcabc

2.3 Park Unpark 版

/**
 * @author lxy
 * @version 1.0
 * @Description
 * @date 2022/7/5 18:26
 */
@Slf4j(topic = "c.Test26")
public class Test26 {
    static Thread t1,t2,t3;
    public static void main(String[] args) {
        ParkUnpark pu = new ParkUnpark(5);
        t1 = new Thread(() -> {
            pu.print("a", t2);
        });
        t2 = new Thread(() -> {
            pu.print("b", t3);
        });
        t3 = new Thread(() -> {
            pu.print("c", t1);
        });
        t1.start();
        t2.start();
        t3.start();
        LockSupport.unpark(t1);//唤醒t1
    }
}
class ParkUnpark{
    private int loopNumber;
    public ParkUnpark(int loopNumber){
        this.loopNumber = loopNumber;
    }
    public void print(String str,Thread next){
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();//当前线程等待
            System.out.print(str);
            LockSupport.unpark(next);//唤醒下一个线程
        }
    }
}
相关文章
|
14天前
|
Java 程序员
Java多线程编程是指在一个进程中创建并运行多个线程,每个线程执行不同的任务,并行地工作,以达到提高效率的目的
【6月更文挑战第18天】Java多线程提升效率,通过synchronized关键字、Lock接口和原子变量实现同步互斥。synchronized控制共享资源访问,基于对象内置锁。Lock接口提供更灵活的锁管理,需手动解锁。原子变量类(如AtomicInteger)支持无锁的原子操作,减少性能影响。
23 3
|
11月前
|
Linux 编译器 调度
【线程概念和线程控制】(一)
【线程概念和线程控制】(一)
92 0
|
11月前
|
存储
【线程概念和线程控制】(二)
【线程概念和线程控制】(二)
47 0
|
12月前
|
消息中间件 Java
同步模式之保护性暂停
同步模式之保护性暂停
|
11月前
|
Java
架构系列——线程通信的实现方式
架构系列——线程通信的实现方式
|
Java 程序员
同步模式之顺序控制线程执行
同步模式是指在多线程编程中,为了保证线程之间的协作和正确性,需要对线程的执行顺序进行控制。顺序控制线程执行是一种同步模式,它通过控制线程的等待和唤醒来实现线程的有序执行。
112 0
同步模式之顺序控制线程执行
|
消息中间件 Cloud Native Java
线程同步模式之保护性暂停
保护性暂停是一种同步模式,用于保护共享资源的完整性。在多线程或多进程环境中,如果多个线程或进程同时访问共享资源,可能会导致数据不一致或者竞态条件等问题
123 0
|
缓存 监控 安全
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(二)
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(二)
【多线程】基础 | 线程的状态
大家好,上一篇主要主要是对多线程的了解,以及对Thread的start方法,进行了源码的跟踪,今天我们主要来说一说其他的状态。
|
监控
【多线程:两阶段终止模式】volatile版本
【多线程:两阶段终止模式】volatile版本
93 0