虽然多线程的技术大大帮助了程序运行的效率,但是在太多的线程的创建与销毁下,系统的开销也将会是非常庞大的。所以为了实现线程的可管理性,并且降低开销。则JUC中提供了线程池的概念,以及相关实现方法
一、线程池的介绍
面试题:线程池的实现
创建一个阻塞队列来容纳任务,在第一次执行任务时创建足够多的线程,并处理任务,之后每个工作线程自动从任务队列中获取线程,直到任务队列中任务为0为止,此时线程处于等待状态,一旦有工作任务加入任务队列中,即刻唤醒工作线程进行处理,实现线程的可复用性。
创建四种线程池的方法
方法名 | 描述 |
newCachedThreadPool() |
创建一个可以根据需要自动调整线程数量的线程池。 |
newFixedThreadPool(int nThreads) |
创建具有固定数量线程的线程池。 |
newSingleThreadScheduledExecutor() |
创建仅包含单个线程的线程池,可以按照计划安排任务的执行。 |
newScheduledThreadPool(int corePoolSize) |
创建具有固定核心线程数量的线程池,可以按照计划安排任务的执行,并且具有一定数量的可变大小线程池。 |
四种线程池的作用
newCachedThreadPool()
:创建一个可以根据需要自动调整线程数量的线程池。适用于执行大量短期异步任务的场景,线程池会根据任务的数量自动增加或减少线程的数量。newFixedThreadPool(int nThreads)
:创建具有固定数量线程的线程池。适用于需要控制并发线程数的场景,线程数量固定不变。newSingleThreadScheduledExecutor()
:创建仅包含单个线程的线程池,可以按照计划安排任务的执行。适用于需要按顺序执行任务的场景,每个任务都会在前一个任务完成后再执行。newScheduledThreadPool(int corePoolSize)
:创建具有固定核心线程数量的线程池,可以按照计划安排任务的执行,并且具有一定数量的可变大小线程池。适用于需要根据计划安排任务执行时间的场景,核心线程数固定,但可以根据需要增加额外的线程来处理更多任务。
创建线程池
Executor类创建的线程池主要通过两类接口描述:ExecutorService(线程池)和ScheduledExecutorService(调度线程池)
线程池常用的方法
方法名 | 描述 |
execute(Runnable command) |
按顺序执行给定的命令,提交到线程池中进行执行。 |
submit(Runnable task) |
提交一个可运行的任务给线程池,并返回一个表示该任务的未决结果的 Future。 |
submit(Callable task) |
提交一个可调用的任务给线程池,并返回一个表示该任务的未决结果的 Future。 |
invokeAll(Collection<? extends Callable<T>> tasks) |
在给定的任务列表中的所有任务完成之前,按顺序执行并等待每个任务的完成,并返回一个表示所有任务结果的 Future 列表。 |
invokeAny(Collection<? extends Callable<T>> tasks) |
执行给定的任务列表,返回其中一个已经成功完成的任务的结果,并取消所有其他任务。 |
isShutdown() |
如果线程池已经调用了 shutdown() 或 shutdownNow() 方法,返回 true 。否则返回 false 。 |
shutdown() |
平滑地关闭线程池,停止接受新任务,并尝试将所有未完成的任务继续执行。 |
schedule() |
安排在给定的延迟后执行任务,并返回一个表示该任务的未决结果的 ScheduledFuture 。 |
二、线程池的创建
面试题:线程池中的线程是怎么创建的?
四种线程对象的创建
1.案例代码:创建缓存线程池(建立了一个线程池,而且线程数量是没有限制的(当然,不能超过Integer的最大值),新增一个任务即有一个线程处理,或者复用之前空闲的线程,或者重亲启动一个线程,但是一旦一个线程在60秒内一直处于等待状态时(也就是一分钟无事可做),则会被终止)
package Example2135; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class javaDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); // 如果线程池中的线程不够用,则会自动新创建线程,线程最多为Integer.MAX_VALUE个 for (int i=0;i<100;i++){ executorService.submit(()->{ System.out.println(Thread.currentThread().getId()+"--"+Thread.currentThread().getName()); }); } if (!executorService.isShutdown()){ executorService.shutdown(); } } }
编辑
注意:关于execute与submit方法的关系
- 返回值类型:
execute()
方法没有返回值,仅用于提交可运行的任务;而submit()
方法将提交的任务封装成一个Future
对象,可以通过该对象获取任务执行的结果。- 参数类型:
execute()
方法接受一个Runnable
类型的参数,用于提交不需要返回结果的任务;而submit()
方法可以接受Runnable
或者Callable
类型的参数,用于提交既可以返回结果也可以不返回结果的任务。- 异常处理:
execute()
方法无法捕获任务执行过程中的异常,如果任务抛出异常,线程池无法感知;而submit()
方法可以捕获任务执行过程中的异常,并以Future
形式返回异常信息。综上所述,
execute()
更适合用于执行简单的、无需返回结果的任务,而submit()
则更灵活,可以用来执行有返回结果的任务,并且可以捕获任务的异常。
2.案例代码:创建固定长度的线程池(在初始化时已经决定了线程的最大数量,若任务添加的能力超出了线程的处理能力,则建立阻塞队列容纳多余的任务)
package Example2136; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class javaDemo { public static void main(String[] args) { // 创建固定长度为4的线程池 ExecutorService executorService = Executors.newFixedThreadPool(4); // 让线程输出自己的默认名称 for(int i=0;i<10;i++){ executorService.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } executorService.shutdown(); } }
编辑
3.案例代码:创建单线程(顾名思义就是一个池中只有一个线程在运行,该线程永不超时,而且由于是一个线程,当有多个任务需要处理时,会将它们放置到一个无界阻塞队列中逐个处理)
package Example2137; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class javaDemo { public static void main(String[] args) { // 创建单线程对象 ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); for (int i=0;i<10;i++){ executorService.submit(()->{ System.out.println(Thread.currentThread().getName()); }); } executorService.shutdown(); } }
编辑
4.案例代码:创建调度线程池
package Example2139; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class javaDemo { public static void main(String[] args) { // 创建调度线程池,并设置内核线程数量为1 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); // 设置调度的任务,并设置3秒后执行,之后每隔2秒执行一次 scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { Date date = new Date(); System.out.println(Thread.currentThread().getName()+"为您播报:当前时间为"+ date); } },3,2, TimeUnit.SECONDS); } }
编辑
如果在线程池中传入了Callable接口实例,那么可以通过Future接口获取返回的结果在ExecutorService接口中提供了invokeAny和invokeAll两个方法可以实现一组Callable实例的执行
案例代码:执行一组Callable实例
package Example2140; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.*; public class javDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { // 保存多个线程对象 Set<Callable<String>> allThread = new HashSet<Callable<String>>(); // 集合中追加线程 for (int i=0;i<10;i++){ final int temp = i; allThread.add(()->{ return Thread.currentThread().getName()+temp; }); } // 创建固定长度的线程 ExecutorService service = Executors.newFixedThreadPool(3); // 使用Future接收类型 List<Future<String>> list = service.invokeAll(allThread); for (Future<String> future : list){ System.out.println(future.get()); } } }
编辑
三、特殊线程池
3.1.CompletionService异步处理
CompletionService的主要功能是可以通过异步处理获取到线程池返回的结果,CompletionService可以接收Callable或者Runnable实现的线程任务。并且可以通过ExecutorCompletionService子类实例化接口对象
什么是异步处理
异步处理指的是在任务执行期间,不需要等待任务完成就可以继续执行其他操作。通过异步处理,可以提高程序的响应速度和效率。在传统的同步处理中,必须等待一个任务执行完毕后才能执行下一个任务,而异步处理则可以同时执行多个任务,提高了任务的并发性。
案例:使用CompletionService接口获取异步执行任务结果
package Example2141; import java.util.concurrent.*; //线程体 class ThreadItem implements Callable<String> { @Override public String call() throws Exception { // 获取当前时间戳 long TimeMillis = System.currentTimeMillis(); try { System.out.println("[start]"+Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(3); System.out.println("[end]"+Thread.currentThread().getName()); }catch (Exception e){} return Thread.currentThread().getName()+":"+ TimeMillis; } } public class javaDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); // 信息生产者 for (int i=0;i<10;i++){ // 提交线程 completionService.submit(new ThreadItem()); } for (int i=0;i<10;i++){ System.out.println("获取数据"+completionService.take().get()); } // 关闭线程池 executorService.shutdown(); } }
3.2.ThreadPoolExecutor
通过Executors类可以进行实现线程池的创建,而通过Executors类的创建都是基于ThreadPoolExecutor类的实现创建。在一些特殊的环境下,开发者也可以直接使用ThreadPoolExecutor类结合阻塞队列与拒绝策略实现属于自己的线程池。
什么是拒绝策略
拒绝策略(Rejection Policy)是在线程池中,当提交的任务超过线程池容量且无法处理时,决定如何处理这些任务的一种策略。当线程池中的工作队列(阻塞队列)已满,并且没有空闲的线程可以执行任务时,就会触发拒绝策略。
四种拒绝策略
拒绝策略 | 描述 |
AbortPolicy | 默认策略,当线程池无法处理新任务时,直接抛出RejectedExecutionException 异常,表示拒绝执行该任务。 |
CallerRunsPolicy | 当线程池无法处理新任务时,将任务返回给调用者进行处理。也就是由调用submit 方法的线程来执行该任务。 |
DiscardOldestPolicy | 当线程池无法处理新任务时,会丢弃最早进入工作队列的任务,并尝试重新提交当前任务。 |
DiscardPolicy | 当线程池无法处理新任务时,会默默丢弃无法处理的任务,不给予任何提示。该策略可能会导致任务丢失,慎用。 |
案例代码:
package Example2142; import java.util.concurrent.*; public class javaDemo { public static void main(String[] args) { BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2); // 通过ThreadPoolExecutor创建线程池,该线程有2个内核线程,最大线程量为2,每个线程存活时间为6秒,使用默认拒绝策略 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,6L, TimeUnit.SECONDS,queue, Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); for (int i=0;i<5;i++){ threadPoolExecutor.submit(()->{ System.out.println("[BEFORE]"+Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); }catch (Exception e){} System.out.println("[AFTER]" +Thread.currentThread().getName()); }); } } }
编辑
3.3 ForkJoinPool
在JDK1.7以后为了充分利用多核CPU的性能优势,可以将一个复杂的业务计算进行拆分,交由多台CPU并行计算,这样用以提高程序的执行性能,所以引入了ForkJoinPool类,该类包含两个操作
Fork(分解操作):将一个大型业务拆分成多个小的任务放在框架中执行
Join(合并操作):主任务将等待多个子任务完成后进行合并
在ForkJoinPool中有两个子类,RecursiveTask(有返回值的任务)、RecursiveAction(无返回值的任务)
该类常用的方法:
方法 | 说明 |
fork() |
将任务分解为更小的子任务并异步执行。将当前任务放入工作队列中,以供其他工作线程获取并执行。 |
join() |
等待当前任务的执行结果,并返回结果。如果任务还没有完成,则调用线程会被阻塞,直到任务完成后才会继续执行。 |
isCompleteNormally() |
判断任务是否已经正常完成。如果任务在完成时没有抛出异常,该方法返回true ;如果任务被取消或者发生异常,返回false 。 |
invokeAll(tasks) |
执行给定的任务集合,并等待所有任务完成。该方法会将任务集合拆分为更小的子任务,并由线程池中的工作线程异步执行。 |
getException() |
获取任务执行过程中所发生的异常。如果任务正常完成或者尚未完成,该方法返回null ;如果任务被取消或者异常中止,返回引发异常的原因。 |
案例代码:
package Example2143; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; class SumTask extends RecursiveTask<Integer> { private int start; private int end; public SumTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if (this.end - this.start < 100) { for (int x = this.start; x <= this.end; x++) { sum += x; } } else { int middle = (start + end) / 2; SumTask leftTask = new SumTask(this.start, middle); SumTask rightTask = new SumTask(middle + 1, this.end); leftTask.fork(); rightTask.fork(); // 等待子任务完成并将结果相加 sum = leftTask.join() + rightTask.join(); } return sum; } } public class javaDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { SumTask task = new SumTask(0,100); ForkJoinPool pool = new ForkJoinPool(); Future<Integer> future = pool.submit(task); System.out.println(future.get()); } }
编辑