Guarded Suspension【生产消费者模式】
一:guarded suspension的参与者
--->guardedObject(被防卫)参与者
1.1该参与者拥有一个被防卫的方法(getRequest),如果警戒条件达成,则执行。警戒条件不达成,则线程进入wait set
1.2该参与者还拥有一个改变参与者状态的方法(putRequest)。参与者的状态影响着警戒条件的是否达成。
--->该模式的角色:生产端线程,消费端线程,传递数据的摇篮(被防卫的参与者)
二:guarded suspension模式什么时候使用
--->适合交易系统使用。客户端下单,服务端处理订单。高并发,大量数据处理的模式,增强服务的吞吐量
三:guarded suspension思考
--->与该模式共通的三个特征
3.1:有循环存在
3.2:有条件测试
3.3:有因某种原因的等待
---> guarded wait 被阻断而等待
等待端范例:
while(条件){
wait();
}
唤醒端范例:
条件=true
notifyAll();
---> busy wait 忙碌地等待
yield,尽可能把优先级交给其他线程,那个线程调用此方法,那个线程暂时一次让出CPU调度权。至于能否暂停,实际看cpu是否去掉别的线程。我主动放弃一次,至于cpu走不走,看cpu的。yield不会解除锁定,所以这段代码不可写在snychronized里。而ready字段必须声明成volatile
等待端范例:
while(ready){
Thead.yield();
}
唤醒端范例:
ready=true
---> spin lock 旋转而锁定
旋转而锁定的意思,表现出条件成立前while循环不断"旋转"的样子,spin lock有时意思与guarded wait相同,有时则与busy wait相同。另外,有时候则是指一开始使用busy wait ,之后再切换成guarded wait方式。另外有些硬件实现的同步机制
--->polling
" 进行舆论调查"的意思,反复检查某个事件是否发生,当发生时就进行对应的处理。
模仿队列(传递数据)
1 package com.yeepay.sxf.thread3; 2 3 import java.util.LinkedList; 4 5 /** 6 * 存放实体的队列 7 * @author sxf 8 * 9 */ 10 public class RequestEntryQueue { 11 //存放请求的链表集合 12 private final LinkedList<RequestEntry> list=new LinkedList<RequestEntry>(); 13 14 15 //从队列中取出请求 16 public synchronized RequestEntry getRequestEntry(){ 17 //判断存放请求的集合是否存在,请求,不存在,就让当前消费着线程进入wait set 18 while (list.size()<=0) { 19 try { 20 wait(); 21 } catch (InterruptedException e) { 22 // TODO Auto-generated catch block 23 e.printStackTrace(); 24 } 25 26 } 27 //如果不为空,则返回第一个请求,并从集合中删除该请求 28 return list.removeFirst(); 29 } 30 31 32 //往队列中放入请求 33 public synchronized void putRequestEntry(RequestEntry requestEntry){ 34 //将新的请求放入队列 35 list.addLast(requestEntry); 36 //唤醒所有wait set中的线程 37 notifyAll(); 38 } 39 }
请求实体
1 package com.yeepay.sxf.thread3; 2 /** 3 * 请求实体 4 * @author sxf 5 * 6 */ 7 public class RequestEntry { 8 //请求人的名字 9 private String name; 10 //线程的名字 11 private String clientThreadName; 12 13 14 public RequestEntry() { 15 super(); 16 } 17 18 19 @Override 20 public String toString() { 21 // TODO Auto-generated method stub 22 return "["+clientThreadName+"生产线程]生产的商品"+name+"被消费"; 23 } 24 25 26 public RequestEntry(String name,String clientThreadName) { 27 28 this.name = name; 29 this.clientThreadName=clientThreadName; 30 31 } 32 33 public String getName() { 34 return name; 35 } 36 37 public void setName(String name) { 38 this.name = name; 39 } 40 41 42 public String getClientThreadName() { 43 return clientThreadName; 44 } 45 46 47 public void setClientThreadName(String clientThreadName) { 48 this.clientThreadName = clientThreadName; 49 } 50 51 52 53 }
客户端线程
1 package com.yeepay.sxf.thread3; 2 3 import java.util.Random; 4 5 /** 6 * 客户端线程 7 * @author sxf 8 * 9 */ 10 public class ClientThread implements Runnable{ 11 //存放请求的队列 12 private RequestEntryQueue requestEntryQueue; 13 //随即数 14 private Random random; 15 //线程名字 16 private String name; 17 18 //构造器 19 public ClientThread(RequestEntryQueue requestEntryQueue, Random random,String name) { 20 super(); 21 this.requestEntryQueue = requestEntryQueue; 22 this.random = random; 23 this.name=name; 24 } 25 26 27 28 29 @Override 30 public void run() { 31 for(int i=0;i<1000;i++){ 32 //生成一个请求 33 RequestEntry requestEntry=new RequestEntry("No"+i,name); 34 //将该请求存入队列 35 requestEntryQueue.putRequestEntry(requestEntry); 36 //让线程休息几秒 37 try { 38 Thread.sleep(random.nextInt(1000)); 39 } catch (InterruptedException e) { 40 // TODO Auto-generated catch block 41 e.printStackTrace(); 42 } 43 } 44 45 } 46 47 48 49 50 }
服务端线程
1 package com.yeepay.sxf.thread3; 2 3 import java.util.Random; 4 5 /** 6 * 服务线程 7 * @author sxf 8 * 9 */ 10 public class ServiceThread implements Runnable{ 11 //持有存放请求的队列 12 private RequestEntryQueue requestEntryQueue; 13 //随即数 14 private Random random; 15 //线程名字 16 private String name; 17 18 19 public ServiceThread(RequestEntryQueue requestEntryQueue, Random random, 20 String name) { 21 super(); 22 this.requestEntryQueue = requestEntryQueue; 23 this.random = random; 24 this.name = name; 25 } 26 27 28 @Override 29 public void run() { 30 while(true){ 31 RequestEntry requestEntry=requestEntryQueue.getRequestEntry(); 32 System.out.println("【消费者线程"+name+"】----------->"+requestEntry.toString()); 33 34 try { 35 Thread.sleep(random.nextInt(1000)); 36 } catch (InterruptedException e) { 37 // TODO Auto-generated catch block 38 e.printStackTrace(); 39 } 40 } 41 42 } 43 44 45 }
测试类
1 package com.yeepay.sxf.thread3; 2 3 import java.util.Random; 4 5 /** 6 * 测试类 7 * @author sxf 8 * 9 */ 10 public class Test { 11 12 public static void main(String[] args) { 13 //声明一个队列 14 RequestEntryQueue requestEntryQueue=new RequestEntryQueue(); 15 //声明两个生产者线程 16 Thread clientThread1=new Thread(new ClientThread(requestEntryQueue, new Random(), "QQ客户端")); 17 Thread clientThread2=new Thread(new ClientThread(requestEntryQueue, new Random(), "ALIBABA客户端")); 18 //声明三个消费者线程 19 Thread serviceThread1=new Thread(new ServiceThread(requestEntryQueue, new Random(), "易宝支付")); 20 Thread serviceThread2=new Thread(new ServiceThread(requestEntryQueue, new Random(), "支付宝")); 21 Thread serviceThread3=new Thread(new ServiceThread(requestEntryQueue, new Random(), "财付通")); 22 //开启线程 23 clientThread1.start(); 24 clientThread2.start(); 25 26 serviceThread1.start(); 27 serviceThread2.start(); 28 serviceThread3.start(); 29 30 } 31 }