共享模型之管程(2)

简介: 共享模型之管程

共享模型之管程(1)https://developer.aliyun.com/article/1530866

6、Wait/Notify

小故事 - 为什么需要 wait

由于条件不满足,小南不能继续进行计算

但小南如果一直占用着锁,其它人就得一直阻塞,效率太低

于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,

其它人可以由老王随机安排进屋

直到小M将烟送来,大叫一声 [ 你的烟到了 ] (调用 notify 方法)

小南于是可以离开休息室,重新进入竞争锁的队列

(1)原理

  • 锁对象调用wait方法(obj.wait),就会使当前线程进入WaitSet中,变为WAITING状态。
  • 处于BLOCKED和WAITING状态的线程都为阻塞状态,CPU都不会分给他们时间片。但是有所区别:
  • BLOCKED状态的线程是在竞争对象时,发现Monitor的Owner已经是别的线程了,此时就会进入EntryList中,并处于BLOCKED状态
  • WAITING状态的线程是获得了对象的锁,但是自身因为某些原因需要进入阻塞状态时,锁对象调用了wait方法而进入了WaitSet中,处于WAITING状态
  • BLOCKED状态的线程会在锁被释放的时候被唤醒,但是处于WAITING状态的线程只有被锁对象调用了notify方法(obj.notify/obj.notifyAll),才会被唤醒。

注:只有当对象被锁以后,才能调用wait和notify方法

public class Test1 {
  final static Object LOCK = new Object();
  public static void main(String[] args) throws InterruptedException {
        //只有在对象被锁住后才能调用wait方法
    synchronized (LOCK) {
      LOCK.wait();
    }
  }
}Copy

API 介绍

obj.wait() 让进入 object 监视器的线程到 waitSet 等待

obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒

obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

final static Object obj = new Object();
public static void main(String[] args)
{
    new Thread(() - >
    {
        synchronized(obj)
        {
            log.debug("执行....");
            try
            {
                obj.wait(); // 让线程在obj上一直等待下去
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            log.debug("其它代码....");
        }
    }).start();
    new Thread(() - >
    {
        synchronized(obj)
        {
            log.debug("执行....");
            try
            {
                obj.wait(); // 让线程在obj上一直等待下去
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            log.debug("其它代码....");
        }
    }).start();
    // 主线程两秒后执行
    sleep(2);
    log.debug("唤醒 obj 上其它线程");
    synchronized(obj)
    {
        obj.notify(); // 唤醒obj上一个线程
        // obj.notifyAll(); // 唤醒obj上所有等待线程
    }
}

notify 的一种结果

20:00:53.096 [Thread-0] c.TestWaitNotify - 执行....
20:00:53.099 [Thread-1] c.TestWaitNotify - 执行....
20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....

notifyAll 的结果

19:58:15.457 [Thread-0] c.TestWaitNotify - 执行....
19:58:15.460 [Thread-1] c.TestWaitNotify - 执行....
19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码....
19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....

wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到

notify 为止

wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify

(2)Wait与Sleep的区别

不同点

  • Sleep是Thread类的静态方法,Wait是Object的方法,Object又是所有类的父类,所以所有类都有Wait方法。
  • Sleep在阻塞的时候不会释放锁,而Wait在阻塞的时候会释放锁
  • Sleep不需要与synchronized一起使用,而Wait需要与synchronized一起使用(对象被锁以后才能使用)

相同点

  • 阻塞状态都为TIMED_WAITING

(3)优雅地使用wait/notify

什么时候适合使用wait

  • 当线程不满足某些条件,需要暂停运行时,可以使用wait。这样会将对象的锁释放,让其他线程能够继续运行。如果此时使用sleep,会导致所有线程都进入阻塞,导致所有线程都没法运行,直到当前线程sleep结束后,运行完毕,才能得到执行。

使用wait/notify需要注意什么

  • 当有多个线程在运行时,对象调用了wait方法,此时这些线程都会进入WaitSet中等待。如果这时使用了notify方法,可能会造成虚假唤醒(唤醒的不是满足条件的等待线程),这时就需要使用notifyAll方法
synchronized (LOCK) {
  while(//不满足条件,一直等待,避免虚假唤醒) {
    LOCK.wait();
  }
  //满足条件后再运行
}
synchronized (LOCK) {
  //唤醒所有等待线程
  LOCK.notifyAll();
}

然后其他线程用while

7、模式之保护性暂停

(1)定义

(2)举例

public class Test2 {
  public static void main(String[] args) {
    String hello = "hello thread!";
    Guarded guarded = new Guarded();
    new Thread(()->{
      System.out.println("想要得到结果");
      synchronized (guarded) {
        System.out.println("结果是:"+guarded.getResponse());
      }
      System.out.println("得到结果");
    }).start();
    new Thread(()->{
      System.out.println("设置结果");
      synchronized (guarded) {
        guarded.setResponse(hello);
      }
    }).start();
  }
}
class Guarded {
  /**
   * 要返回的结果
   */
  private Object response;
  
    //优雅地使用wait/notify
  public Object getResponse() {
    //如果返回结果为空就一直等待,避免虚假唤醒
    while(response == null) {
      synchronized (this) {
        try {
          this.wait();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    return response;
  }
  public void setResponse(Object response) {
    this.response = response;
    synchronized (this) {
      //唤醒休眠的线程
      this.notifyAll();
    }
  }
  @Override
  public String toString() {
    return "Guarded{" +
        "response=" + response +
        '}';
  }
}Copy

带超时判断的暂停

public Object getResponse(long time) {
    synchronized (this) {
      //获取开始时间
      long currentTime = System.currentTimeMillis();
      //用于保存已经等待了的时间
      long passedTime = 0;
      while(response == null) {
        //看经过的时间-开始时间是否超过了指定时间
        long waitTime = time -passedTime;
        if(waitTime <= 0) {
          break;
        }
        try {
                    //等待剩余时间
          this.wait(waitTime);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        //获取当前时间
        passedTime = System.currentTimeMillis()-currentTime   
            }
    }
    return response;
  }Copy

(3)join源码——使用保护性暂停模式

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (millis == 0) {
            while (isAlive()) {
                wait(0);  //一直等待
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

wait():注意,前述提及wait操作对象一定是持有锁的对象,而join方法在方法头中含有Syschronized关键字

拓展

图中Futures就好比居民楼一层的信箱(每个信箱有房间编号),左侧的t0,t2,t4就好比等待邮件的居民,右侧的t1,t3,t5就好比邮递员

如果需要在多个类之间使用GuardedObject对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

邮递员和收信者就不需要互相传递GuardedObject对象

package cn.itcast.test;
import cn.itcast.n2.util.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
@Slf4j(topic = "c.Test20")
public class Test20 {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        Sleeper.sleep(1);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "内容" + id).start();
        }
    }
}
@Slf4j(topic = "c.People")
class People extends Thread{
    @Override
    public void run() {
        // 收信
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread {
    private int id;
    private String mail;
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObject.complete(mail);
    }
}
class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;
    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}
// 增加超时效果
class GuardedObject {
    // 标识 Guarded Object
    private int id;
    public GuardedObject(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    // 结果
    private Object response;
    // 获取结果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 开始时间 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虚假唤醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02  1s
            }
            return response;
        }
    }
    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

异步模式之生产者消费者

  • 与前面的保护性暂停中的GuardObjecl不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果
  • 数据消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK中各种阻塞队列,采用的就是这种模式

java之间线程进行通信,rabbitmq是进程之间进行通信

package cn.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import static cn.itcast.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.Test21")
public class Test21 {
    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(2);
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id , "值"+id));
            }, "生产者" + i).start();
        }
        new Thread(() -> {
            while(true) {
                sleep(1);
                Message message = queue.take();
            }
        }, "消费者").start();
    }
}
// 消息队列类 , java 线程之间通信
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
    // 消息的队列集合
    private LinkedList<Message> list = new LinkedList<>();
    // 队列容量
    private int capcity;
    public MessageQueue(int capcity) {
        this.capcity = capcity;
    }
    // 获取消息
    public Message take() {
        // 检查队列是否为空
        synchronized (list) {
            while(list.isEmpty()) {
                try {
                    log.debug("队列为空, 消费者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 从队列头部获取消息并返回
            Message message = list.removeFirst();
            log.debug("已消费消息 {}", message);
            list.notifyAll();
            return message;
        }
    }
    // 存入消息
    public void put(Message message) {
        synchronized (list) {
            // 检查对象是否已满
            while(list.size() == capcity) {
                try {
                    log.debug("队列已满, 生产者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 将消息加入队列尾部
            list.addLast(message);
            log.debug("已生产消息 {}", message);
            list.notifyAll();
        }
    }
}
final class Message {
    private int id;
    private Object value;
    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }
    public int getId() {
        return id;
    }
    public Object getValue() {
        return value;
    }
    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }
}

共享模型之管程(3)https://developer.aliyun.com/article/1530876

相关文章
|
20天前
|
Java
共享模型之管程(5)
共享模型之管程
15 0
|
20天前
共享模型之管程(4)
共享模型之管程
12 0
|
20天前
|
存储 安全 Java
共享模型之管程(1)
共享模型之管程
11 0
|
20天前
|
Java
共享模型之管程(3)
共享模型之管程
17 0
|
10月前
|
存储
【线程概念和线程控制】(二)
【线程概念和线程控制】(二)
47 0
|
10月前
|
Linux 编译器 调度
【线程概念和线程控制】(一)
【线程概念和线程控制】(一)
89 0
|
10月前
|
消息中间件 存储 缓存
【多线程系列-01】深入理解进程、线程和CPU之间的关系
【多线程系列-01】深入理解进程、线程和CPU之间的关系
32411 14
|
9月前
|
存储 Linux 调度
多线程——线程概念和线程控制
什么是线程,POSIX线程库,线程控制:pthread_create线程创建,pthread_exit线程终止,pthread_join线程回收,pthread_cancel线程取消,pthread_detach线程分离。线程id和地址空间分局,C++语言级别的多线程,二次封装线程库
88 0
多线程——线程概念和线程控制
|
10月前
|
消息中间件 并行计算 调度
多线程和进程的关系
多线程和进程的关系
64 0
|
存储 Linux 调度
Linux多线程:线程概念、线程间的独有与共享、多线程VS多进程,线程控制:线程创建、线程终止、线程等待、线程分离
Linux多线程:线程概念、线程间的独有与共享、多线程VS多进程,线程控制:线程创建、线程终止、线程等待、线程分离
202 0