剑指JUC原理-6.wait notify(下)

简介: 剑指JUC原理-6.wait notify

剑指JUC原理-6.wait notify(上):https://developer.aliyun.com/article/1413609


同步模式之保护性暂停


定义


即 Guarded Suspension,用在一个线程等待另一个线程的执行结果


要点:


  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式


实现


class GuardedObject {
  private Object response;
  private final Object lock = new Object();
  public Object get() {
      synchronized (lock) {
      // 条件不满足则等待
        while (response == null) {
          try {
            lock.wait();
          }catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      return response;
    }
  }
  public void complete(Object response) {
    synchronized (lock) {
      // 条件满足,通知等待线程
      this.response = response;
      lock.notifyAll();
    }
  }
}


应用


一个线程等待另一个线程的执行结果


join的局限性是只能等第二个线程结束


用join那种办法,等待结果那个变量只能设计成全局的,而使用保护性暂停的话可以设计成局部的

public static void main(String[] args) {
 GuardedObject guardedObject = new GuardedObject();
 new Thread(() -> {
  try {
    // 子线程执行下载
    List<String> response = download();// 执行下载操作,去完成网页的信息
    log.debug("download complete...");
    guardedObject.complete(response);
  } catch (IOException e) {
    e.printStackTrace();
  }
 }).start();
 log.debug("waiting...");
 // 主线程阻塞等待
 Object response = guardedObject.get();
 log.debug("get response: [{}] lines", ((List<String>) response).size());
}

执行结果:

08:42:18.568 [main] c.TestGuardedObject - waiting...
08:42:23.312 [Thread-0] c.TestGuardedObject - download complete...
08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines

可以看到,是存在等待时间的,大概5s左右


带超时版 GuardedObject


如果要控制超时时间呢

class GuardedObjectV2 {
  private Object response;
  private final Object lock = new Object();
  public Object get(long millis) {
    synchronized (lock) {
        // 1) 记录最初时间
        long begin = System.currentTimeMillis();
        // 2) 已经经历的时间
        long timePassed = 0;
        while (response == null) {
          // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
          long waitTime = millis - timePassed;
          log.debug("waitTime: {}", waitTime);
          if (waitTime <= 0) {
            log.debug("break...");
            break;
          }
          try {
            lock.wait(waitTime);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
          timePassed = System.currentTimeMillis() - begin;
          log.debug("timePassed: {}, object is null {}", timePassed, response == null);
        }
      return response;
    }
  }
  public void complete(Object response) {
    synchronized (lock) {
      // 条件满足,通知等待线程
      this.response = response;
      log.debug("notify...");
      lock.notifyAll();
    }
  }
}

测试,没有超时

    public static void main(String[] args) {
            GuardedObjectV2 v2 = new GuardedObjectV2();
            new Thread(() -> {
                sleep(1);
                v2.complete(null);
                sleep(1);
                v2.complete(Arrays.asList("a", "b", "c"));
            }).start();
            Object response = v2.get(2500);
            if (response != null) {
                log.debug("get response: [{}] lines", ((List<String>) response).size());
            } else {
                log.debug("can't get response");
            }
        }

输出:

08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500
08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify...
08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true
08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497
08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify...
08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false
08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines

测试,超时

// 等待时间不足
List<String> lines = v2.get(1500);

输出:

08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 1500
08:47:55.963 [Thread-0] c.GuardedObjectV2 - notify...
08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true
08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 498
08:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true
08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 0
08:47:56.461 [main] c.GuardedObjectV2 - break...
08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response
08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...


join原理


保护性暂停,是一个线程等待另一个线程的结果


而join是一个线程等待另一个线程的结束


进入源码查看 thread.join 和 带超时版 GuardedObject 有异曲同工之妙

进入源码可以发现,join的底层也是使用的保护性暂停的手段


是调用者轮询检查线程 alive 状态

t1.join();
等价于下面的代码
synchronized (t1) {
 // 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
 while (t1.isAlive()) {
 t1.wait(0);
 }
}


多任务版 GuardedObject


图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员


如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

新增 id 用来标识 Guarded Object

class GuardedObject {
    // 标识 Guarded Object
    private int id;
    public GuardedObject(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    // 结果
    private Object response;
    // 获取结果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 开始时间 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虚假唤醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
            }
            return response;
        }
    }
    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

中间解耦类

class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;
    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}
其中中间解耦类中的共享资源要设置成线程安全的,如synchronized 和 Hashtable

业务相关类

class People extends Thread{
 @Override
 public void run() {
 // 收信
 GuardedObject guardedObject = Mailboxes.createGuardedObject();
 log.debug("开始收信 id:{}", guardedObject.getId());
 Object mail = guardedObject.get(5000);
 log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
 }
}
class Postman extends Thread {
 private int id;
 private String mail;
 public Postman(int id, String mail) {
 this.id = id;
 this.mail = mail;
 }
 @Override
 public void run() {
 GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
 log.debug("送信 id:{}, 内容:{}", id, mail);
 guardedObject.complete(mail);
 }
}

测试

public static void main(String[] args) throws InterruptedException {
 for (int i = 0; i < 3; i++) {
 new People().start();
 }
 Sleeper.sleep(1);
 for (Integer id : Mailboxes.getIds()) {
 new Postman(id, "内容" + id).start();
 }
}

某次运行结果

10:35:05.689 c.People [Thread-1] - 开始收信 id:3
10:35:05.689 c.People [Thread-2] - 开始收信 id:1
10:35:05.689 c.People [Thread-0] - 开始收信 id:2
10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2
10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1
10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2
10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1
10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3
10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3


异步模式之生产者/消费者


之所以是异步,是因为,产生了一个消息,并不能立刻被消费。而保护性暂停是同步的


定义


要点


  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式


实现


class Message {
    private int id;
    private Object message;
    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
    public int getId() {
        return id;
    }
    public Object getMessage() {
        return message;
    }
}
class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}


应用


public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 下载任务
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                try {
                    System.out.println("download...");
                    List<String> response = Downloader.download();
                    System.out.println("try put message()"+ id);
                    messageQueue.put(new Message(id, response));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "生产者" + i).start();
        }
// 1 个消费者线程, 处理结果
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                List<String> response = (List<String>) message.getMessage();
                System.out.println("take message({}):" + message.getId());
            }
        }, "消费者").start();
    }
10:48:38.070 [生产者3] c.TestProducerConsumer - download...
10:48:38.070 [生产者0] c.TestProducerConsumer - download...
10:48:38.070 [消费者] c.MessageQueue - 没货了, wait
10:48:38.070 [生产者1] c.TestProducerConsumer - download...
10:48:38.070 [生产者2] c.TestProducerConsumer - download...
10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)
10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)
10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)
10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)
10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines
10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines
10:48:41.240 [消费者] c.MessageQueue - 没货了, wait

多加几个消费者线程本质上也是一样的。


Park & Unpark


基本使用


它们是 LockSupport 类中的方法

// 暂停当前线程
LockSupport.park(); 
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先 park 再 unpark

Thread t1 = new Thread(() -> {
 log.debug("start...");
 sleep(1);
 log.debug("park...");
 LockSupport.park();
 log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park... 
18:42:54.583 c.TestParkUnpark [main] - unpark... 
18:42:54.583 c.TestParkUnpark [t1] - resume... 

先 unpark 再 park

Thread t1 = new Thread(() -> {
 log.debug("start...");
 sleep(2);
 log.debug("park...");
 LockSupport.park();
 log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

18:43:50.765 c.TestParkUnpark [t1] - start... 
18:43:51.764 c.TestParkUnpark [main] - unpark... 
18:43:52.769 c.TestParkUnpark [t1] - park... 
18:43:52.769 c.TestParkUnpark [t1] - resume... 


特点


与 Object 的 wait & notify 相比


  • wait,notify 和 notifyAll 必须配合 Object Monitor (必须先获得monitor锁才可以)一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify


原理之 park & unpark


每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex


原理分为两种情况


先park再unpark

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0


先unpark后park

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0


注意:多次调用unpark也仅仅只是将 counter设置为1


目录
相关文章
|
5月前
|
Java Linux 调度
剑指JUC原理-7.线程状态与ReentrantLock(中)
剑指JUC原理-7.线程状态与ReentrantLock
52 0
|
4月前
|
安全 Java
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
【6月更文挑战第20天】JAVA多线程中,wait(), notify(), notifyAll()是Object类的关键同步机制。wait()让线程等待并释放锁,直到被notify()或notifyAll()唤醒或超时。它们必须在同步块中使用,持有锁的线程调用。notify()唤醒一个等待线程,notifyAll()唤醒所有。最佳实践包括:与synchronized结合,循环检查条件,避免循环内notify(),通常优先使用notifyAll()。
46 0
|
4月前
|
Java 开发者
JAVA多线程通信入门:wait()、notify()、notifyAll()大揭秘!
【6月更文挑战第20天】Java多线程中,`wait()`, `notify()`, `notifyAll()`是Object类的关键通信方法。`wait()`让线程等待并释放锁,`notify()`随机唤醒一个等待的线程,`notifyAll()`唤醒所有。示例展示了在共享资源类中如何使用它们来协调生产者消费者线程。调用前需持有锁,否则抛异常。注意避免死锁和活锁,恰当使用这些方法至关重要。
33 0
|
4月前
|
Java
wait()和notify():JAVA多线程世界的“信号兵”
【6月更文挑战第20天】在Java多线程中,`wait()`和`notify()`作为Object类的方法,扮演着线程间协调者的角色。`wait()`让线程等待并释放锁,`notify()`或`notifyAll()`唤醒等待的线程。在生产者-消费者模型中,它们用于同步访问资源,例如队列。当队列满或空时,线程调用wait()暂停,另一方完成操作后用notify()唤醒。理解并正确使用这些“信号兵”对构建高效的多线程程序至关重要。
24 0
|
5月前
|
Java 调度
多线程(初阶五:wait和notify)
多线程(初阶五:wait和notify)
80 0
|
5月前
|
API
剑指JUC原理-6.wait notify(上)
剑指JUC原理-6.wait notify
77 0
|
5月前
剑指JUC原理-7.线程状态与ReentrantLock(下)
剑指JUC原理-7.线程状态与ReentrantLock
51 0
|
5月前
剑指JUC原理-7.线程状态与ReentrantLock(上)
剑指JUC原理-7.线程状态与ReentrantLock
55 0
|
存储 Java API
面试官:为什么 wait/notify 必须与 synchronized 一起使用??
面试官:为什么 wait/notify 必须与 synchronized 一起使用??
158 0
面试官:为什么 wait/notify 必须与 synchronized 一起使用??
|
安全 Java
Java并发编程之Wait和Notify
Java并发编程之Wait和Notify
127 0
Java并发编程之Wait和Notify