文章目录
线程池概述
什么是线程池?
线程虽然是轻量级进程,尽管线程比进程创建和销毁所消耗 的资源要少。但是如果线程的创建和销毁频率高了,开销也还是有的,为了进一步提高效率,引入了线程池,池子里面放着事先创建好的线程.后面用的时候直接从池子里面拿,如此速度就快了,但是代价线程池所需的空间,线程池就是以空间换时间。
为什么从线程池拿会比直接创建线程快?
因为创建线程和销毁线程是操作系统完成了,需要从用户态切换到内核态 这是耗时耗力 的。如果从线程池直接拿的话,就省去了切换到内核态的时间,同时当线程不用的时候直接放回到线程池即可。
Java标准库中的线程池
标准库中线程池为ThreadPoolExecutor类,该类中最主要是包含两类线程,一类是核心线程,另一类是非核心线程。当派发任务给线程池中的线程时,干活的是核心线程,当来的活太多了,核心线程不够用了,就会启动非核心线程。当活变少了,就会把非核心线程 给裁了。简单来说所谓的核心线程就像公司里面的正式工,非核心线程则是实习生。当公司人手不够的时候就会招多点实习生来干活,当活少了,实习生也就可以走了。
在Java8中,ThreadPoolExecutor一共提供了4个构造方法,在此主要介绍参数最多的,其他的三个构造方法都是这个构造方法减少参数而来的,所以搞懂了这个参数最多的构造方法,其他的自然而然也明白了。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数解释
corePoolSize:表示核心线程数
maximumPoolSize:池中允许的最大线程数,就是核心线程和非核心线程之和
keepAliveTime:非核心线程在被终止之前等待新任务的最大时间,超过这个时间,该线程就会被停用。
unit:时间单位
workQueue:在任务执行之前用于保存任务的队列,该队列仅将保存submit方法提交的Runnable任务
threadFactory:创建新线程 时所使用的工厂
RejectedExecutionHandler:拒绝策略,执行被处理使用的处理程序,因为达到线程限制和对列容量
拒接策略详解
ThreadPoolExecutor中有四个静态内部类实现了RejectedExecutionHandler接口,分别对应四种不同的拒绝策略
AbortPolicy:被拒绝的任务的处理程序,抛出一个 RejectedExecutionException 。当活太多了,线程已经忙不过来了,还来活时,直接不处理,抛出异常。
CallerRunsPolicy:任务从哪里来就回到哪里去。
DiscardOldestPolicy:队列满了但是不会抛出异常,直接丢弃新任务,不做任何处理
DiscardPolicy:队列满了, 丢弃工作队列中最旧的任务,然后尝试再次提交新任务,不会抛出异常。
常用方法
四种拒接策略演示
AbortPolicy
演示代码
import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor threadExecutor = new ThreadPoolExecutor( 1, 1, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); for (int i = 0; i < 5; i++) { final int taskId = i; threadExecutor.submit(()->someTask(taskId)); } threadExecutor.shutdown(); } /** * 定义一个需要并发执行的任务 * * @param taskId */ private static void someTask(int taskId) { System.out.println("Task " + taskId + " is starting..."); try { Thread.sleep(100); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is finished!"); } }
运行结果:
任务太多了,抛出异常之后就罢工了,不干活了。
CallerRunsPolicy
演示代码
import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor threadExecutor = new ThreadPoolExecutor( 1, 1, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i < 5; i++) { final int taskId = i; threadExecutor.submit(()->someTask(Thread.currentThread().getName(),taskId)); } threadExecutor.shutdown(); } /** * 定义一个需要并发执行的任务 * * @param taskId */ private static void someTask(String name,int taskId) { System.out.println(name+":Task " + taskId + " is starting..."); try { Thread.sleep(100); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is finished!"); } }
运行结果:
当任务过多时,直接拒接不干了,要干你自己干,所以有部分任务是main线程自己干的
DiscardOldestPolicy
代码演示
import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor threadExecutor = new ThreadPoolExecutor( 1, 1, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy() ); for (int i = 0; i < 5; i++) { final int taskId = i; threadExecutor.submit(()->someTask(Thread.currentThread().getName(),taskId)); } threadExecutor.shutdown(); } /** * 定义一个需要并发执行的任务 * * @param taskId */ private static void someTask(String name,int taskId) { System.out.println(name+":Task " + taskId + " is starting..."); try { Thread.sleep(100); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is finished!"); } }
运行结果:
DiscardPolicy
代码演示
import java.util.concurrent.*; public class ThreadPoolExecutorDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor threadExecutor = new ThreadPoolExecutor( 2, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() ); for (int i = 0; i < 10; i++) { final int taskId = i; threadExecutor.submit(()->someTask(Thread.currentThread().getName(),taskId)); } threadExecutor.shutdown(); } /** * 定义一个需要并发执行的任务 * * @param taskId */ private static void someTask(String name,int taskId) { System.out.println(name+":Task " + taskId + " is starting..."); try { Thread.sleep(2000); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is finished!"); } }
运行结果:
Executors
如果你觉得上述创建线程池的方式太复杂了,可以使用Executors来创建线程,其返回值是ExecutorService接口。Executors 本质上是 ThreadPoolExecutor 类的封装.
Executors 创建线程池的几种方式
newFixedThreadPool: 创建固定线程数的线程池
newCachedThreadPool: 创建线程数目动态增长的线程池.
newSingleThreadExecutor: 创建只包含单个线程的线程池.
newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer.
使用演示
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorsDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { final int taskId = i; executorService.submit(()->someTask(Thread.currentThread().getName(),taskId)); } executorService.shutdown(); } /** * 定义一个需要并发执行的任务 * * @param taskId */ private static void someTask(String name,int taskId) { System.out.println(name+":Task " + taskId + " is starting..."); try { Thread.sleep(2000); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is finished!"); } }
运行结果:
简单实现线程池
线程池的基本逻辑
线程池事先存放着准备好的线程,当有任务提交入池的时候,实际上是放入了阻塞队列中,然后线程池中的线程调度执行这些任务,在java中的线程池有核心线程和非核心线程,我们是简单实现,所以都是以核心线程的方式实现。
实现线程池的基本逻辑
使用阻塞队列组织所有的任务,定义一个线程池类其核心方法为submit()将任务添加到阻塞队列中,还需要一个工作线程不断向阻塞对列扫描获取任务并执行任务。
实现代码
MyThreadPool类实现
import java.util.concurrent.LinkedBlockingQueue; public class MyThreadPool { private int maxWorkerCount = 10; private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue(); public void submit(Runnable command) throws InterruptedException { if (queue.size() < maxWorkerCount) { // 当前 worker 数不足, 就继续创建 worker Worker worker = new Worker(queue); worker.start(); } // 将任务添加到任务队列中 queue.put(command); } }
Worker实现
import java.util.concurrent.LinkedBlockingQueue; public class Worker extends Thread { private LinkedBlockingQueue<Runnable> queue = null; public Worker(LinkedBlockingQueue<Runnable> queue) { super("worker"); this.queue = queue; } @Override public void run() { // try 必须放在 while 外头, 或者 while 里头应该影响不大 try { while (!Thread.interrupted()) { Runnable runnable = queue.take(); runnable.run(); } } catch (InterruptedException e) { } } }
测试代码
public class Demo { public static void main(String[] args) throws InterruptedException { MyThreadPool myThreadPool = new MyThreadPool(); for (int i = 0; i < 5; i++) { final int taskId = i; myThreadPool.submit(() -> someTask(Thread.currentThread().getName(),taskId)); } Thread.sleep(1000); } /** * 定义一个需要并发执行的任务 * * @param taskId */ private static void someTask(String name,int taskId) { System.out.println(name+":Task " + taskId + " is starting..."); try { Thread.sleep(2000); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is finished!"); } }
运行结果:
各位看官如果觉得文章写得不错,点赞评论关注走一波!谢谢啦!。