多线程案例
单例模式
单例模式是一种设计模式
设计模式,就是程序员的棋谱,这里介绍了很多典型场景,以及典型场景的处理方式,按照设计模式写代码,代码写的肯定不会很差。
单例模式对应的场景,有些时候希望有的对象,在整个程序中只有一个实例(对象),只能new一次
写法:
- 饿汉模式(迫切):程序启动,类加载之后,立即创建出实例
class Singleton { private static Singleton instance = new Singleton(); private Singleton() {} public static Singleton getInstance() { return instance; } }
- 懒汉模式(延时):类加载的时候不创建实例,则是在第一次使用实例的时候,再创建,否则,能不创建就不创建
class Singleton { private static Singleton instance = null; private Singleton() {} public static Singleton getInstance() { if (instance == null) { instance = new Singleton(); } return instance; } }
- 懒汉模式-多线程版
上面的懒汉模式的实现是线程不安全的
线程安全问题发生在首次创建实例时. 如果在多个线程中同时调用 getInstance 方法, 就可能导致
创建出多个实例.
一旦实例已经创建好了, 后面再多线程环境调用 getInstance 就不再有线程安全问题了(不再修改
instance 了)
加上 synchronized 可以改善这里的线程安全问题
class Singleton { private static Singleton instance = null; private Singleton() {} public synchronized static Singleton getInstance() { if (instance == null) { instance = new Singleton(); } return instance; } }
- 懒汉模式-多线程版(改进)
以下代码在加锁的基础上, 做出了进一步改动:
- 使用双重 if 判定, 降低锁竞争的频率.
- 给 instance 加上了 volatile
class Singleton { private static volatile Singleton instance = null; private Singleton() {} public static Singleton getInstance() { if (instance == null) {//第一个条件是为了判断是否要加锁,如果不是null就没必要加锁了 synchronized (Singleton.class) {//是null就加锁 if (instance == null) {//第二个条件是判断是否要创建对象,因为在多线程环境下可能有多个线程同时通过第一个条件检查 instance = new Singleton(); } } } return instance; } }
即先判断是否要加锁,再决定是不是真的加锁
加锁 / 解锁是一件开销比较高的事情. 而懒汉模式的线程不安全只是发生在首次创建实例的时候. 因此后续使用的时候, 不必再进行加锁了.
外层的 if 就是判定下看当前是否已经把 instance 实例创建出来了.
同时为了避免 “内存可见性” 导致读取的 instance 出现偏差, 于是补充上 volatile .
当多线程首次调用 getInstance, 大家可能都发现 instance 为 null, 于是又继续往下执行来竞争锁, 其中竞争成功的线程, 再完成创建实例的操作.
当这个实例创建完了之后, 其他竞争到锁的线程就被里层 if 挡住了. 也就不会继续创建其他实例.
这里加上volatile
还有一个作用:避免此处赋值操作的指令重排序
指令重排序:是编译器优化的一种手段,在原有执行逻辑不变的情况下,对代码执行顺序进行调整,是其执行效率变高
在单线程没事,但在多线程就可能出现问题了
比如我们上述代码在instance = new Singleton();
这里我们可以分为3个步骤:
- 给对象创建出内存空间,得到内存地址
- 在空间上调用构造方法,对对象进行初始化
- 把内存地址,赋值个instance引用
这里发生指令重排序,步骤可能就变为132了,同样是在单线程没事,但多线程就不行了
比如线程1在这里执行132,在3执行完后2还没执行,出现了线程切换,线程2执行的时候instance!=null
,就直接返回instance
了,后续可能使用instance
的属性方法之类的,因为还没初始化,就可能出现啥情况了。于是加入volatile
就是避免这样的事情发生
这个**懒汉模式-多线程版(改进)**的关键就3点:
- 加锁
- 双重
if
volatile
阻塞队列
阻塞队列是什么
阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则。
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
- 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.(削峰)
比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.
- 阻塞队列也能使生产者和消费者之间解耦.
比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.
- 基于链表
- 基于堆,有优先级的
- 基于数组
Array这个版本速度更快,前提是知道有多少个元素。如果不知道有多少个元素就用Linked的
对于BlockingQueue
来说,offer()
和pull()
不带有阻塞功能,put()
和take()
带有阻塞功能
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
阻塞队列实现
- 通过 “循环队列” 的方式来实现.
- 使用 synchronized 进行加锁控制.
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
class MyBlockingQueue{ //使用一个String数组来保存数据,假设只存String private String[] items=new String[1000]; volatile private int head=0;//队头 volatile private int tail=0;//队尾 //队列有效范围 [head,tail),当head和tail重合时,相当于空队列 volatile private int size=0;//使用size表示元素个数 private Object locker=new Object(); //入队列 public void put(String elem) throws InterruptedException { //加锁 //此处的写法相当于直接把synchronized写到方法上了 synchronized (locker){ while(size>=items.length){//此处的while目的不是为了循环,而是借助循环巧妙的实现了wait被唤醒之后再次确认条件 //队列满了 //return; locker.wait();//这里的notify在出队列的方法中 } items[tail++]=elem; if(tail>=items.length){ tail=0; }//这整个判断语句的内容 <=> tail=(tail+1)%items.length,这个代码也能起到,当tail到达末尾就能回到开头 size++; //使用这个notify来唤醒队列空的阻塞情况 locker.notify(); } } //出队列 public String take() throws InterruptedException { //加锁 synchronized (locker){ while(size==0){//此处的while目的不是为了循环,而是借助循环巧妙的实现了wait被唤醒之后,再次确认条件 //队列为空,暂时不能出队列 //return null; locker.wait();//这里的notify在入队列的方法中 } String elem=items[head++]; if(head>=items.length){ head=0; } size--; //使用这个notify来唤醒队列满的阻塞情况 locker.notify(); return elem; } } } //测试代码 public static void main(String[] args) throws InterruptedException { MyBlockingQueue queue=new MyBlockingQueue(); //创建两个线程,表示生产者和消费者 Thread t1=new Thread(()->{ int count=0; while (true){ try { queue.put(count+""); System.out.println("生产元素:"+count); count++; Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2=new Thread(()->{ while (true){ try { String count=queue.take(); System.out.println("消费元素:"+count); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); }
上述代码是没问题的,但很容易写错在一个地方,就是在put()的wait()的异常处理,这里在idea可以选择抛出异常也可以选择try-catch处理,但如果是try-catch处理,wait()被interrupt唤醒后代码往下走进入catch,方法不会结束而是继续往下执行,就会强行添加元素然后覆盖元素,这是不应该的。而如果像上面一样抛出异常,出现异常后下面就不会继续执行了,就不会出现覆盖元素了。
定时器
定时器是什么
定时器也是软件开发中的一个重要组件. 类似于一个 “闹钟”. 达到一个设定的时间之后, 就执行某个指定好的代码.
定时器是一种实际开发中非常常用的组件.
比如网络通信中, 如果对方 500ms 内没有返回数据, 则断开连接尝试重连.
比如一个 Map, 希望里面的某个 key 在 3s 之后过期(自动删除).
类似于这样的场景就需要用到定时器.
标准库中的定时器
- 标准库中提供了一个 Timer 类. Timer 类的核心方法为 schedule .
- schedule 包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行 (单位为毫秒).
Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("hello"); } }, 3000);
实现定时器
定时器的构成:
- 一个带优先级的阻塞队列
为啥要带优先级呢?
因为阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来.
- 队列中的每个元素是一个 Task 对象.
- Task 中带有一个时间属性, 队首元素就是即将
- 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
- MyTimer 类提供的核心接口为
schedule
, 用于注册一个任务, 并指定这个任务多长时间后执行.
public class MyTimer { public void schedule(Runnable command, long after) { // TODO } }
- MyTimerTask 类用于描述一个任务. 里面包含一个 Runnable 对象和一个 time(毫秒时间戳)
这个对象需要放到 优先队列 中. 因此需要实现Comparable
接口.
class MyTimerTask implements Comparable<MyTimerTask> { //任务啥时候执行 private long time; //任务具体是啥 private Runnable runnable; public MyTimerTask(Runnable runnable, long delay) { //delay是一个相对时间差 //构造time要根据当前系统时间和delay进行构造 time = System.currentTimeMillis() + delay; this.runnable = runnable; } public long getTime() { return time; } public Runnable getRunnable() { return runnable; } @Override public int compareTo(MyTimerTask o) { //把时间小的优先级高,把它放入队首中 // 怎么记忆, 这里是谁减去谁?? 不要记!! 记容易记错~~ // 随便写一个顺序, 然后实验一下就行了. return (int)(this.time-o.time); //return (int) (o.time - this.time); } }
- MyTimer 实例中, 通过 PriorityBlockingQueue 来组织若干个 MyTimerTask 对象.
通过 schedule 来往队列中插入一个个 MyTimerTask 对象.
class MyTimer { private Object locker=new Object(); //使用优先级队列,来保存N个任务 private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>(); //定时器的核心方法,把要执行的任务添加到队列中 public void schedule(Runnable runnable, long delay) { synchronized(locker){ MyTimerTask task = new MyTimerTask(runnable, delay); queue.offer(task); //每次来新的任务,都唤醒一下之前的扫描线程 locker.notify(); } } }
- MyTimer 类中存在一个 t 线程, 一直不停的扫描队首元素, 看看是否能执行这个任务
所谓 “能执行” 指的是该任务设定的时间已经到达了
- 引入一个 locker 对象, 借助该对象的
wait / notify
来解决while (true)
的忙等问题.
class MyTimer { private Object locker=new Object(); // ... 前面的代码不变 //MyTimer中还需要构造一个“扫描线程”,一方面负责监控队首元素是否到点了,是否应该执行; // 一方面任务到点之后,就要调用Runnable的run方法来完成任务 public MyTimer() { //扫描线程 Thread t = new Thread(() -> { while (true) { try { synchronized(locker){ while (queue.isEmpty()) { //注意如果队列为空就不要取元素 //此处使用wait等待更合适,如果使用continue,就会使这个线程while循环运行的飞快 //也会陷入一个高频占用CPU的状态(忙等) locker.wait(); } MyTimerTask task = queue.peek(); long curTime = System.currentTimeMillis(); if (curTime >= task.getTime()) { //假设当前时间14:01,任务时间14:00,就要应该执行 //需要执行任务 queue.poll(); task.getRunnable().run(); } else { //让当前扫描线程休眠一下,按照时间差来进行休眠 //Thread.sleep(task.getTime() - curTime);可使用sleep就会有很多弊端,具体看7.28这天的板书 locker.wait(task.getTime()-curTime); } } } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } }
修改 MyTimer 的 schedule 方法, 每次有新任务到来的时候唤醒一下 t 线程. (因为新插入的任务可能是需要马上执行的).
class MyTimer { //定时器的核心方法,把要执行的任务添加到队列中 public void schedule(Runnable runnable, long delay) { synchronized(locker){ MyTimerTask task = new MyTimerTask(runnable, delay); queue.offer(task); //每次来新的任务,都唤醒一下之前的扫描线程 locker.notify(); } } }
完整代码:
//创建一个类,描述定时器的一个任务 class MyTimerTask implements Comparable<MyTimerTask> { //任务啥时候执行 private long time; //任务具体是啥 private Runnable runnable; public MyTimerTask(Runnable runnable, long delay) { //delay是一个相对时间差 //构造time要根据当前系统时间和delay进行构造 time = System.currentTimeMillis() + delay; this.runnable = runnable; } public long getTime() { return time; } public Runnable getRunnable() { return runnable; } @Override public int compareTo(MyTimerTask o) { //把时间小的优先级高,把它放入队首中 // 怎么记忆, 这里是谁减去谁?? 不要记!! 记容易记错~~ // 随便写一个顺序, 然后实验一下就行了. return (int)(this.time-o.time); //return (int) (o.time - this.time); } } //定时器类本体 class MyTimer { private Object locker=new Object(); //使用优先级队列,来保存N个任务 private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>(); //定时器的核心方法,把要执行的任务添加到队列中 public void schedule(Runnable runnable, long delay) { synchronized(locker){ MyTimerTask task = new MyTimerTask(runnable, delay); queue.offer(task); //每次来新的任务,都唤醒一下之前的扫描线程 locker.notify(); } } //MyTimer中还需要构造一个“扫描线程”,一方面负责监控队首元素是否到点了,是否应该执行; // 一方面任务到点之后,就要调用Runnable的run方法来完成任务 public MyTimer() { //扫描线程 Thread t = new Thread(() -> { while (true) { try { synchronized(locker){ while (queue.isEmpty()) { //注意如果队列为空就不要取元素 //此处使用wait等待更合适,如果使用continue,就会使这个线程while循环运行的飞快 //也会陷入一个高频占用CPU的状态(忙等) locker.wait(); } MyTimerTask task = queue.peek(); long curTime = System.currentTimeMillis(); if (curTime >= task.getTime()) { //假设当前时间14:01,任务时间14:00,就要应该执行 //需要执行任务 queue.poll(); task.getRunnable().run(); } else { //让当前扫描线程休眠一下,按照时间差来进行休眠 //Thread.sleep(task.getTime() - curTime);可使用sleep就会有很多弊端,具体看7.28这天的板书 locker.wait(task.getTime()-curTime); } } } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } }
线程池
线程池是什么
提前创建好一波线程,后续需要使用线程,就直接从线程池里拿即可
线程池最大的好处就是减少每次启动、销毁线程的损耗。
标准库中的线程池
- 使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
- 返回值类型为 ExecutorService
- 通过 ExecutorService.submit 可以注册一个任务到线程池中.
ExecutorService pool = Executors.newFixedThreadPool(10); pool.submit(new Runnable() { @Override public void run() { System.out.println("hello"); } });
Executors 创建线程池的几种方式
- newFixedThreadPool: 创建固定线程数的线程池
- newCachedThreadPool: 创建线程数目动态增长的线程池.
- newSingleThreadExecutor: 创建只包含单个线程的线程池.
- newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer.
Executors 本质上是 ThreadPoolExecutor 类的封装.
ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定. (后面再介绍)
实现线程池
- 核心操作为 submit, 将任务加入线程池中
- 使用 t 描述一个工作线程. 使用 Runnable 描述一个任务.
- 使用一个 BlockingQueue 组织所有的任务
- 每个 t 线程要做的事情: 不停的从 BlockingQueue 中取任务并执行.
- 指定一下线程池中的最大线程数 maxWorkerCount; 当当前线程数超过这个最大值时, 就不再新增线程了.
class MyThreadPool{ private BlockingDeque<Runnable> queue=new LinkedBlockingDeque<>(); //通过这个方法把任务添加到线程池中 public void submit(Runnable runnable) throws InterruptedException { queue.put(runnable); } // n 表示线程池里有几个线程 //创建一个固定数量的线程池 public MyThreadPool(int n){ for (int i = 0; i < n; i++) { Thread t=new Thread(()->{ while(true){ try { //取出任务,并执行 Runnable runnable=queue.take(); runnable.run(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } } }
总结-保证线程安全的思路
- 使用没有共享资源的模型
- 适用共享资源只读,不写的模型
- 不需要写共享资源的模型
- 使用不可变对象
- 直面线程安全(重点)
- 保证原子性
- 保证顺序性
- 保证可见性
对比线程和进程
线程的优点
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
进程与线程的区别
- 进程是系统进行资源分配和调度的一个独立单位,线程是程序执行的最小单位。
- 进程有自己的内存地址空间,线程只独享指令流执行的必要资源,如寄存器和栈。
- 由于同一进程的各线程间共享内存和文件资源,可以不通过内核进行直接通信。
- 线程的创建、切换及终止效率更高。