异步模式之生产者/消费者
1. 定义
与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应(保护暂停模式是一一对应的)
消费队列可以用来平衡生产和消费的线程资源
比如如果使用保护性暂停模式,一个用户需要对应一个邮差,比较浪费。而生产者消费者模式便可以做到多个用户对应一个邮差。
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK 中各种阻塞队列,采用的就是这种模式
这是异步模式,生产者生产的内容并不会被立刻消费,而是先存在于队列中,通过队列被消费者消费。
2. 实现
/** * @author lxy * @version 1.0 * @Description 生产者消费者模式 * @date 2022/6/24 19:08 */ @Slf4j(topic = "c.Test16") public class Test16 { public static void main(String[] args) { MessageQueue queue = new MessageQueue(2); for (int i = 0; i < 3; i++) { int id = i;//effectively final new Thread(()->{//lambda 表达式中使用的变量应该是 final 或者有效的 final queue.put(new Message(id,"值"+id)); },"生产者"+i).start(); } new Thread(()->{ while (true){ Sleeper.sleep(1); queue.take(); } },"消费者").start(); } } //消息队列类, java线程之间通信 @Slf4j(topic = "c.MessageQueue") class MessageQueue{ private LinkedList<Message> list = new LinkedList <Message>();//底层实现是一个队列 //队列容量 private int capcity; public MessageQueue(int capcity) { this.capcity = capcity; } //接收消息 public Message take(){ //检查队列是否为空 synchronized (list){ while (list.isEmpty()){ try { log.debug("队列为空,消费者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //从队列头部获取消息并返回 // return list.removeFirst(); //不能这样写,要分开写。不然没法使用notifyAll唤醒生产者 Message message = list.removeFirst(); log.debug("已消费消息{}",message); list.notifyAll();//消费完唤醒生产者继续生产 return message; } } //存入消息 public void put(Message message){ synchronized (list){ while (list.size() == capcity){ try { log.debug("队列已满,生产者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //将消息加入队列尾部 list.addLast(message); log.debug("已生产消息{}",message); list.notifyAll(); } } } final class Message{//使用final,表示没有子类继承。没有setter方法,表示在构造初始的时候就创建好,后序不支持修改 private int id;//消息的编号 private Object value;//消息的内容 public Message(int id, Object value) { this.id = id; this.value = value; } public int getId() { return id; } public Object getValue() { return value; } @Override public String toString() { return "Message{" + "id=" + id + ", value=" + value + '}'; } }
输出:
注意:synchronized
锁的是多个线程操作的共享变量,本例中最好使用 list非this,我们使用的也是list.wait()
和list.notifyAll()
。
终止模式之两阶段终止模式
Two Phase Termination
在一个线程 T1 中如何“优雅”终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。
1、错误思路
使用线程对象的 stop() 方法停止线程
stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁
使用 System.exit(int) 方法停止线程
目的仅是停止一个线程,但这种做法会让整个程序都停止
2、两阶段终止模式
图解如下:
如果监控线程在阻塞阶段被interrupt,则捕获异常,手动设置打断标记(因为标记会被清除),下一轮料理后事后结束循环。
如果监控线程在正常工作阶段被interrupt,则会设置打断标记,下一轮料理后事后结束循环。
2.1 利用 isInterrupted
原理:interrupt 可以打断正在执行的线程,无论这个线程是在 sleep,wait,还是正常运行
代码实现:
@Slf4j(topic = "c.Test10") public class Test10 { public static void main(String[] args) throws InterruptedException { TwoPhaseTermination tpt = new TwoPhaseTermination(); tpt.start(); Thread.sleep(4000); tpt.stop(); } } @Slf4j(topic = "c.TwoPhaseTermination") class TwoPhaseTermination{ private Thread monitor; //启动监控线程 public void start(){ monitor = new Thread(() -> { while (true) { Thread currentThread = Thread.currentThread(); if (currentThread.isInterrupted()) { log.debug("料理后事"); break; } try { Thread.sleep(1000);//情况一:阻塞时interrupt log.debug("执行监控记录");//情况二:正常运行时被interrupt } catch (InterruptedException e) { e.printStackTrace(); currentThread.interrupt();//阻塞被打断后,程序恢复正常运行。再次被打断,将会有一个打断标记(因为此时是处于正常状态的),下一轮会被停止线程。 } } }); monitor.start(); } //停止监控线程 public void stop(){ monitor.interrupt(); } }
运行结果:
2.2 利用停止标记
@Slf4j(topic = "c.Test10") public class Test10 { public static void main(String[] args) throws InterruptedException { TwoPhaseTermination tpt = new TwoPhaseTermination(); tpt.start(); Thread.sleep(4000); log.debug("stop..."); tpt.stop(); } } @Slf4j(topic = "c.TwoPhaseTermination") class TwoPhaseTermination{ //监控线程 private Thread monitor; // 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性 // 我们的例子中,即主线程把它修改为 true 对 monitor 线程可见 private volatile boolean stop = false; //启动监控线程 public void start(){ monitor = new Thread(() -> { while (true) { Thread currentThread = Thread.currentThread(); if (stop) { log.debug("料理后事"); break; } try { Thread.sleep(1000); log.debug("执行监控记录"); } catch (InterruptedException e) { } } },"monitor"); monitor.start(); } //停止监控线程 public void stop(){ stop = true; monitor.interrupt();//不加这个的话,线程会把这一轮执行完才终止 } }
输出:
18:17:14.091 c.TwoPhaseTermination [monitor] - 执行监控记录 18:17:15.099 c.TwoPhaseTermination [monitor] - 执行监控记录 18:17:16.110 c.TwoPhaseTermination [monitor] - 执行监控记录 18:17:17.088 c.Test10 [main] - stop... 18:17:17.088 c.TwoPhaseTermination [monitor] - 料理后事
同步模式之 Balking
**引入:**监控线程主要用来监控CPU、内存的占用等,监控线程只需要一个就行。但是上面的例子我们并没有对线程的数量进行限制。就会出现 每 tpt.start(); 就会创建一个监控线程,这样是毫无意义的。所以我们如何保证方法只执行一次,下次调用直接返回呢?— Balking模式
1. 定义
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
2.实现
例一:对终止模式中的监控线程的改进
**注意:**尽量缩小synchronized块的大小,可以提高并发度和程序的性能
例二:在Tomcat多线程环境下演示
@Service @Slf4j public class MonitorService { private volatile boolean stop; private volatile boolean starting; private Thread monitorThread; public void start() { // 缩小同步范围,提升性能 synchronized (this) { log.info("该监控线程已启动?({})", starting); if (starting) { return; } starting = true; } // 由于之前的 balking 模式,以下代码只可能被一个线程执行,因此无需互斥 monitorThread = new Thread(() -> { while (!stop) { report(); sleep(2); } // 这里的监控线程只可能启动一个,因此只需要用 volatile 保证 starting 的可见性 //也就是让Tomcat中的线程看到starting的修改 log.info("监控线程已停止..."); starting = false; }); stop = false; log.info("监控线程已启动..."); monitorThread.start(); } private void report() { Info info = new Info(); info.setTotal(Runtime.getRuntime().totalMemory()); info.setFree(Runtime.getRuntime().freeMemory()); info.setMax(Runtime.getRuntime().maxMemory()); info.setTime(System.currentTimeMillis()); MonitorController.QUEUE.offer(info); } private void sleep(long seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } public synchronized void stop() { stop = true; // 不加打断需要等到下一次 sleep 结束才能退出循环,这里是为了更快结束 monitorThread.interrupt(); } }
当前端页面多次点击按钮调用 start 时,最终只创建一个监控线程
输出:
[http-nio-8080-exec-10] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(false) [http-nio-8080-exec-10] cn.itcast.monitor.service.MonitorService - 监控线程已启动... [http-nio-8080-exec-8] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true) [http-nio-8080-exec-9] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true) [http-nio-8080-exec-7] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true) [Thread-16] cn.itcast.monitor.service.MonitorService - 监控线程已停止...
它还经常用来实现线程安全的单例
public final class Singleton { private Singleton() { } private static Singleton INSTANCE = null; public static synchronized Singleton getInstance() { if (INSTANCE != null) { return INSTANCE; } INSTANCE = new Singleton(); return INSTANCE; } }
对比一下保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待。
享元模式
1. 简介
定义 英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时,如字符串,每次都保护性拷贝创建新的对象。当我们已经有取值相同的对象时,直接重用而不用在创建。
wikipedia: A flyweight is an object that minimizes memory usage by sharing as much data as possible with other similar objects
出自 “Gang of Four” design patterns
归类 Structual patterns
2. 体现
2.1 包装类
在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
public static Long valueOf(long l) { final int offset = 128; if (l >= -128 && l <= 127) { // will cache return LongCache.cache[(int)l + offset]; } return new Long(l); }
注意:
Byte, Short, Long 缓存的范围都是 -128~127
Character 缓存的范围是 0~127
Integer的默认范围是 -128~127
最小值不能变
但最大值可以通过调整虚拟机参数 -Djava.lang.Integer.IntegerCache.high 来改变
Boolean 缓存了 TRUE 和 FALSE
2.2 String 串池
参考JVM字符串常量池
2.3 BigDecimal BigInteger
为啥转账案例中我们要使用AtomicReference
来保证线程安全吗?不是BigDecimal
已经是线程安全类了嘛
我们之前讲的这些类(String、BigDecimal …),里面的单个方法都是线程安全的,但是多个方法组合在一起都未必了!
3. DIY
**例如:**一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
数据库连接池就是享元模式的一种具体实现
/** * @author lxy * @version 1.0 * @Description 测试连接池 * @date 2022/7/29 16:47 */ public class Test36 { public static void main(String[] args) { Pool pool = new Pool(2); for (int i = 0; i < 5; i++) { new Thread(()->{ Connection conn = pool.borrow(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(conn); }).start(); } } } /** * 连接池类 */ @Slf4j(topic = "c.Pool") class Pool{ //1.连接池大小 private final int poolSize; //2. 连接池对象数组 private Connection[] connections; //3. 连接状态数组 0 表示空闲, 1 表示繁忙 private AtomicIntegerArray states; //4. 构造方法初始化 public Pool(int poolSize) { this.poolSize = poolSize; this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接"+(i+1)); } } //5. 借连接 public Connection borrow(){ while (true){ for (int i = 0; i < poolSize; i++) { // 获取空闲连接 if(states.get(i)==0){ // states.set(i,1); 这里不能用set哦~ 可能存在线程安全的问题 if (states.compareAndSet(i,0,1)) { log.debug("borrow {}",connections[i]); return connections[i]; } } } // 如果没有空闲连接,当前线程进入等待 synchronized (this){ try { log.debug("wait..."); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } //6. 归还连接 public void free(Connection conn){ for (int i = 0; i < poolSize; i++) { if ((connections[i]==conn)) { states.set(i,0); //唤醒其他等待线程 synchronized (this){ log.debug("free {}",conn); this.notifyAll(); } break;//不再向下循环 } } } } // 连接实现类 class MockConnection implements Connection{ private String name; public MockConnection(String name) { this.name = name; } @Override public String toString() { return "MockConnection{" + "name='" + name + '\'' + '}'; } //... }
运行结果:
以上实现参考自Tomcat的连接池,我们在实现过程中 没有考虑:
连接的动态增长与收缩
连接保活(可用性检测)
等待超时处理
分布式 hash
具体更多细节,可以参考 阿里大牛手写Tomcat,1300分钟纯干货精讲让你彻底搞懂Tomcat底层
在项目中,对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等 对于更通用的对象池,可以考虑使用apache commons pool,例如redis连接池可以参考jedis中关于连接池的实