1. 线程池
在之前我们写的代码中,用到线程就创建,用完之后线程就消失了,这样会浪费操作系统的资源,也存在一些弊端,通过线程池就可以解决这个问题
线程池是一种线程使用模式,它维护着多个线程,等待着监督管理者分配可并发执行的任务
线程池的核心原理:
- 创建一个空的线程池
- 提交任务时,线程会创建新的线程对象,任务分配完毕,线程归还给线程池,下次再提交任务时,不需要创建新的线程,直接复用已有的线程即可
- 如果提交任务时,线程池中没有空闲线程,也无法创建新的线程,任务就会排队等待
执行的很多代码逻辑都是要用户态的代码和内核态的代码配合完成,而应用程序又有很多,这些应用程序都是由内核统一负责管理和服务,所以内核中的工作可能是非常繁忙的,也就导致了提交给内核的工作可能是不可控的,因此,通常认为,纯用户态操作,就比经过内核的操作效率更高。
Java标准库里也提供了线程池的实现类 ThreadPoolExecutor
下面是四种构造方法,
接下来介绍一下第四个构造方法中的参数都表示什么:
int corePoolSize : 核心线程数 (核心线程会始终存在于线程池内部,非核心线程,繁忙的时候被创建出来,空闲时就释放掉)
int maximumPoolSize : 最大线程数【(核心线程数+非核心线程数)的最大值】
long keepAliveTime: 非核心线程允许空闲的最大时间(超过这个时间就会被释放)
TimeUnit unit: 非核心线程允许空闲的最大时间的单位
BlockingQueue<Runnable> workQueue :工作队列(线程池的工作过程就是一个典型的“生产者消费者模型”,这里的队列就可以指定容量和类型)
ThreadFactory threadFactory: 线程工厂(Thread类的工厂类,通过这个类,完成Thread的实例创建和初始化操作,此处的 threadFactory就可以针对线程池里的线程进行批量的设置属性),一般情况下使用 Executors.defaultThreadFactory()即可
RejectedExecutionHandler handler :拒绝策略
解释:核心线程就是指一直存在于线程池中的线程,两个空闲时间就是值,创建出来的临时线程空闲的时间,超过这个时间就意味着这靠核心线程就足以完成当前提交的任务,就需要销毁临时线程,节约资源,要执行的任务过多时的解决方案指的是,当前线程池中线程的数量已经达到了最大,并且阻塞队列也已经排满了,就需要把多出来的任务踢出去
下面是四种拒绝策略的描述:
不断地提交任务,会有以下三个临界点:
1. 当核心线程满了之后,再提交任务就会排队
2. 当核心线程和阻塞队列都满了之后,就会创建临时线程
3. 当核心线程,阻塞队列,临时线程都满了之后,会触发任务的拒绝策略
1.1. 线程池的使用
Executors是一个工具类,对ThreadPoolExecutor进行了封装,提供了一些常用的方法:
先来演示一下:
public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(4); for (int i = 0; i < 100; i++) { service.submit(()->{ Thread current = Thread.currentThread(); System.out.println(current.getName() + ": " + i); }); } }
此时发现 i 出现了错误,其实还是之前一直提到的变量捕获的问题,这里换种解决方法:
每次都创建一个局部变量来接收,就可以了
但是此时还有一个问题,运行程序之后发现程序并没有终止
是因为线程池创建出来的线程默认是“前台线程”,虽然main线程结束了,但是这些线程池的前台线程仍然是存在的,可以使用shutdown方法来把所有的线程终止掉,不过也不能立即终止,要确保所有的任务都执行完毕之后再终止,所以可以再添加一个sleep方法
public class ThreadDemo22 { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(4); for (int i = 0; i < 100; i++) { int id = i; service.submit(()->{ Thread current = Thread.currentThread(); System.out.println(current.getName() + ": " + id); }); } //确保所有任务都执行完毕 Thread.sleep(1000); //终止所有线程 service.shutdown(); } }
1.2. 自定义线程池
根据线程池的特性,还可以模拟实现一个固定线程数目的线程池
先来简单的实现一下:
class MyThreadPoll{ private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000); public MyThreadPoll(int n){ //创建n个线程 for(int i = 0;i < n;i++){ Thread thread = new Thread(()->{ while (true){ //循环从队列中取出任务 try { Runnable runnable = queue.take(); runnable.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); thread.start(); } } //提交任务 public void submit(Runnable runnable){ try { queue.put(runnable); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
上面的自定义线程池中,在构造方法里通过for循环来不断地创建线程,通过向阻塞队列中获取线程,并调用run方法执行任务,在提交任务时就是把任务加入到队列中
当然,上面的线程池是比较简陋的, 一些扩容,拒绝策略等功能还没有实现。
2. 定时器
2.1. 定时器的使用
定时器就相当于是一个闹钟,时间到了之后开始执行某个任务,Java中的定时器是TimerTask类和Timer类,TimerTash是一个抽象类,用于表示一个可以被定时器执行的任务,Timer类用于安排TimerTask
在指定的时间执行,下面来演示一下:
public class ThreadDemo24 { public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new TimerTask() { public void run() { System.out.println("hello"); } },2000); System.out.println("程序开始执行..."); } }
也可以多定几个任务:
public class ThreadDemo24 { public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new TimerTask() { public void run() { System.out.println("hello3"); } },3000); timer.schedule(new TimerTask() { public void run() { System.out.println("hello2"); } },2000); timer.schedule(new TimerTask() { public void run() { System.out.println("hello1"); } },1000); System.out.println("程序开始执行..."); } }
2.2. 定时器的实现
模拟一个定时器的实现需要以下步骤:
- 创建类,描述一个要执行的任务是什么(任务的内容,任务的时间)
- 管理多个任务(通过一个合适的数据结构把任务存储起来)
- 有专门的线程执行任务
先来描述一下任务:
class MyTimerTask implements Comparable<MyTimerTask> { //任务 private Runnable runnable; //时间,通过毫秒时间戳来表示这个任务具体什么时候执行 private long time; public MyTimerTask(Runnable runnable, long dely) { this.runnable = runnable; this.time = System.currentTimeMillis() + dely; } public void run() { runnable.run(); } public long getTime() { return time; } public int compareTo(MyTimerTask o) { return (int) (this.time - o.time); } }
接下来就是管理任务
关于用什么数据结构来存储这些任务,可以来分析一下,如果说使用顺序表或者链表这样的结构可行吗,这样的话每次查找执行那个任务都需要遍历一遍,效率低下,所以说可以使用堆来存储,可以很好的找到最小值,只需要判断最小的那个任务是否到时间了即可
由于创建的堆是自定义类型,所以说MyTimeTash类还需要实现Comparable接口,重写compareTo方法
再来看MyTimer类:
class MyTimer { private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>(); private Object lock = new Object(); public MyTimer() { //创建线程,来负责执行任务 Thread t = new Thread(() -> { while (true) { synchronized (lock) { if (queue.isEmpty()) { continue; } MyTimerTask current = queue.peek(); //需要执行任务的情况 if (System.currentTimeMillis() >= current.getTime()) { current.run(); //执行过的任务需要从队列中删除 queue.poll(); } else { continue; } } } }); //开启线程执行任务 t.start(); } public void schedule(Runnable runnable, long dely) { synchronized (lock) { MyTimerTask myTimerTask = new MyTimerTask(runnable, dely); queue.offer(myTimerTask); } } }
创建线程之后,不断地从队列中取出任务来执行,执行完的任务要从队列中移除
同时,为了解决线程安全问题,还需要把出队和入队的操作都加上锁
但是,上面的代码还是存在几个潜在的问题的:
- 初始情况下,队列为空,由于continue的存在就会在循环中会反复地加锁释放锁,显然是不行的,所以可以采用等待唤醒机制来修改:
这里吧continue改为wait会更好一点
唤醒的时机就是每次添加任务的时候:
- 如果说当前时间是10:30,定的时间是12:00,那么期间一个半小时走的都是else分支,还是会不断地加锁
所以说这里也需要一个等待,不过可以不用像刚开始那样一直等,因为这时候肯定是知道要等多少时间的,直接采用有参版本就行
q : 如果使用 sleep 可以吗
a : 不可以
- wait可以被唤醒,如果提前加入了一个任务,就需要唤醒线程来执行任务,但是sleep不能唤醒
- sleep是抱着锁睡的,其他线程拿不到锁(1中任务也不能添加)
最后还有一个问题:关于为什么不使用PriorityBlockingQueue
由于PriorityBlockingQueue自身实现的方法本来就已经带锁了,这样就出现了锁的嵌套,就容易出现死锁的问题
下面是优化版的完整代码:
class MyTimerTask implements Comparable<MyTimerTask> { //任务 private Runnable runnable; //时间,通过毫秒时间戳来表示这个任务具体什么时候执行 private long time; public MyTimerTask(Runnable runnable, long dely) { this.runnable = runnable; this.time = System.currentTimeMillis() + dely; } public void run() { runnable.run(); } public long getTime() { return time; } public int compareTo(MyTimerTask o) { return (int) (this.time - o.time); } } class MyTimer { private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>(); private Object lock = new Object(); public MyTimer() { //创建线程,来负责执行任务 Thread t = new Thread(() -> { while (true) { try { synchronized (lock) { if (queue.isEmpty()) { wait(); } MyTimerTask current = queue.peek(); //需要执行任务的情况 if (System.currentTimeMillis() >= current.getTime()) { current.run(); //执行过的任务需要从队列中删除 queue.poll(); } else { lock.wait(current.getTime() - System.currentTimeMillis()); } } } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //开启线程执行任务 t.start(); } public void schedule(Runnable runnable, long dely) { synchronized (lock) { MyTimerTask myTimerTask = new MyTimerTask(runnable, dely); queue.offer(myTimerTask); lock.notify(); } } } public class ThreadDemo25 { public static void main(String[] args) { MyTimer myTimer = new MyTimer(); myTimer.schedule(() -> { System.out.println("hello1"); }, 1000); myTimer.schedule(() -> { System.out.println("hello2"); }, 2000); myTimer.schedule(() -> { System.out.println("hello3"); }, 3000); System.out.println("开始执行"); } }