【1】常见接口和实现类
① 什么是线程池
首先可以联想一下数据库连接池,Redis中的pool。
线程池提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。
② 常见的体系结构
常见的线程池体系结构:
java.util.concurrent.Executor : 负责线程的使用与调度的根接口 |--ExecutorService 子接口: 线程池的主要接口 |--ThreadPoolExecutor 线程池的实现类 |--ScheduledExecutorService 子接口:负责线程的调度 |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService
如下图所示:
线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
③ 线程池的创建
为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子(hook)。但是,强烈建议程序员使用较为方便的Executors 工厂方法:
Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)
Executors.newFixedThreadPool(int)(固定大小线程池)
Executors.newSingleThreadExecutor()(单个后台线程)
Executors.newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
需要注意的是,它们均为大多数使用场景预定义了设置,也就是说通常情况下可以直接使用无需额外配置。但是实际上还有一个创建线程池的方法那就是手动构造线程池(ThreadPoolExecutor)
如下所示:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorService executorService1 = Executors.newFixedThreadPool(5); ExecutorService executorService2 = Executors.newSingleThreadExecutor();
Executors.newCachedThreadPool():
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
Executors.newFixedThreadPool(int):
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
Executors.newSingleThreadExecutor():
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Executors.newScheduledThreadPool():
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
【2】线程池使用实例
线程池提交任务的三个方法:
<T> Future<T> submit(Callable<T> task); Future<?> submit(Runnable task); <T> Future<T> submit(Runnable task, T result);
实例代码如下:
package com.jane.controller; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * Created by Janus on 2018/9/28. */ public class TestThreadPool { public static void main(String[] args) throws Exception { //1.创建线程池 ExecutorService pool = Executors.newFixedThreadPool(5); //参数为Runnable ThreadPoolRunnable threadPoolRunnable = new ThreadPoolRunnable(); // 2.为线程池中的线程分配任务 // for (int i = 0; i <10 ; i++) { // pool.submit(threadPoolRunnable); // } //参数为Callable ThreadPoolCallable threadPoolCallable = new ThreadPoolCallable(); List<Future<Integer>> futures = new ArrayList<>(); for (int i = 0; i <10 ; i++) { Future<Integer> future = pool.submit(threadPoolCallable); futures.add(future); } for (Future<Integer> future : futures) { System.out.println(future.get()); } //3.关闭线程池 pool.shutdown(); } } class ThreadPoolRunnable implements Runnable{ private int i = 0; @Override public void run() { while(i <= 100){ System.out.println(Thread.currentThread().getName() + " : " + i++); } } } class ThreadPoolCallable implements Callable<Integer>{ @Override public Integer call() throws Exception { int sum =0; for(int i=1;i<=100;i++){ sum+=i; } System.out.println(Thread.currentThread().getName() + " : " +sum); return sum; } }
【3】线程池调度实例
ScheduledExecutorService使用实例代码如下:
package com.jane.controller; import java.util.Date; import java.util.Random; import java.util.concurrent.*; /** * Created by Janus on 2018/9/29. */ public class TestScheduledThreadPool { public static void main(String[] args){ System.out.println("项目启动 : "+new Date()); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); // 延迟两秒执行ScheduledThread ScheduledThread scheduledThread = new ScheduledThread(); scheduledExecutorService.schedule(scheduledThread,2, TimeUnit.SECONDS); //项目启动时延迟一秒,然后以2秒为周期执行 // ScheduledRunnable scheduledRunnable = new ScheduledRunnable(); // scheduledExecutorService.scheduleAtFixedRate(scheduledRunnable,1,2,TimeUnit.SECONDS); // 上一次结束后 延迟一秒,然后以2秒为周期执行 // scheduledExecutorService.scheduleWithFixedDelay(scheduledRunnable,1,2,TimeUnit.SECONDS); } } class ScheduledThread implements Callable<Integer>{ @Override public Integer call() throws Exception { int i = new Random().nextInt(100); System.out.println(Thread.currentThread().getName()+" : "+i+" , "+new Date()); return i; } } class ScheduledRunnable implements Runnable{ @Override public void run() { int i = (int) (Math.random()*1000); System.out.println(Thread.currentThread().getName()+" : "+i+" , "+new Date()); } }
测试一,单独执行schedule:
ScheduledThread scheduledThread = new ScheduledThread(); scheduledExecutorService.schedule(scheduledThread,2, TimeUnit.SECONDS);
效果如下:
项目启动 : Sat Sep 29 10:37:16 CST 2018 pool-1-thread-1 : 69 , Sat Sep 29 10:37:19 CST 2018
在主程序启动2秒后执行,效果正常。
测试二,单独执行scheduleAtFixedRate:
ScheduledRunnable scheduledRunnable = new ScheduledRunnable(); scheduledExecutorService.scheduleAtFixedRate(scheduledRunnable,1,2,TimeUnit.SECONDS);
效果如下:
项目启动 : Sat Sep 29 10:38:51 CST 2018 pool-1-thread-1 : 475 , Sat Sep 29 10:38:52 CST 2018 pool-1-thread-1 : 444 , Sat Sep 29 10:38:54 CST 2018 pool-1-thread-2 : 37 , Sat Sep 29 10:38:56 CST 2018 pool-1-thread-1 : 619 , Sat Sep 29 10:38:58 CST 2018 pool-1-thread-3 : 296 , Sat Sep 29 10:39:00 CST 2018 pool-1-thread-3 : 304 , Sat Sep 29 10:39:02 CST 2018
主程序启动,一秒延迟后,以2秒为间隔周期执行。
测试三,单独执行scheduleWithFixedDelay:
主程序启动,一秒延迟后,以2秒为间隔周期执行。 测试三,单独执行scheduleWithFixedDelay: Sch
效果如下:
项目启动 : Sat Sep 29 10:42:41 CST 2018 pool-1-thread-1 : 473 , Sat Sep 29 10:42:42 CST 2018 pool-1-thread-1 : 667 , Sat Sep 29 10:42:44 CST 2018 pool-1-thread-2 : 788 , Sat Sep 29 10:42:46 CST 2018 pool-1-thread-1 : 836 , Sat Sep 29 10:42:48 CST 2018 pool-1-thread-3 : 457 , Sat Sep 29 10:42:50 CST 2018 pool-1-thread-3 : 923 , Sat Sep 29 10:42:52 CST 2018 pool-1-thread-3 : 120 , Sat Sep 29 10:42:54 CST 2018
看起来和测试二一样?
测试四,添加ScheduledRunnable2 ,run中sleep一秒:
添加一个类ScheduledRunnable2 如下:
class ScheduledRunnable2 implements Runnable{ @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } int i = (int) (Math.random()*1000); System.out.println(Thread.currentThread().getName()+" : "+i+" , "+new Date()); } }
测试代码如下:
ScheduledRunnable2 scheduledRunnable2 = new ScheduledRunnable2(); scheduledExecutorService.scheduleWithFixedDelay(scheduledRunnable2,1,2,TimeUnit.SECONDS);
效果如下:
项目启动 : Sat Sep 29 10:51:15 CST 2018 pool-1-thread-1 : 521 , Sat Sep 29 10:51:17 CST 2018 pool-1-thread-1 : 334 , Sat Sep 29 10:51:20 CST 2018 pool-1-thread-2 : 388 , Sat Sep 29 10:51:23 CST 2018 pool-1-thread-1 : 528 , Sat Sep 29 10:51:26 CST 2018 pool-1-thread-3 : 862 , Sat Sep 29 10:51:29 CST 2018 pool-1-thread-3 : 312 , Sat Sep 29 10:51:32 CST 2018 pool-1-thread-3 : 534 , Sat Sep 29 10:51:35 CST 2018 pool-1-thread-3 : 452 , Sat Sep 29 10:51:38 CST 2018
启动后,延迟一秒,再sleep一秒,然后打印第一次调用;之后等待上一次结束后以2秒周期间隔再执行。
注意,后面间隔都是三秒,充分说明了第二次调用是在第一次调用结束后(因此每次执行都会sleep 1 秒)!
测试五 ,使用scheduleAtFixedRate调用scheduledRunnable2:
测试代码如下:
ScheduledRunnable2 scheduledRunnable2 = new ScheduledRunnable2(); scheduledExecutorService.scheduleAtFixedRate(scheduledRunnable2,1,2,TimeUnit.SECONDS);
效果如下:
项目启动 : Sat Sep 29 10:57:27 CST 2018 pool-1-thread-1 : 500 , Sat Sep 29 10:57:29 CST 2018 pool-1-thread-1 : 991 , Sat Sep 29 10:57:31 CST 2018 pool-1-thread-2 : 1 , Sat Sep 29 10:57:33 CST 2018 pool-1-thread-1 : 241 , Sat Sep 29 10:57:35 CST 2018 pool-1-thread-3 : 92 , Sat Sep 29 10:57:37 CST 2018 pool-1-thread-3 : 517 , Sat Sep 29 10:57:39 CST 2018
项目启动后,延迟一秒,又sleep 1 秒,然后打印第一次调用;之后以2秒间隔周期进行线程调用!!
由此可见FixedDelay与FixedRate区别:
- FixedDelay 上一次结束后,等待X秒,执行下一次;
- FixedRate 两次调用间隔X秒。
【4】Fork/Join 框架
Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行join 汇总。
示意图如下:
实例代码模拟如下:
package com.jane.controller; import java.time.Duration; import java.time.Instant; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * Created by Janus on 2018/9/29. */ public class TestForkJoinPool { public static void main(String[] args){ //开始时间 Instant start = Instant.now(); //forkJoinTask执行离不开ForkJoinPool ForkJoinPool pool = new ForkJoinPool(); // ForkJoinTask<Long> task = new ForkJoinSumCalculate(0, 1000000000L); ForkJoinTask<Long> task = new ForkJoinSumCalculate(0, 50000000000L); Long sum = pool.invoke(task); System.out.println(sum); //使用JDK1.8新特性--日期--结束时间 Instant end = Instant.now(); long millis = Duration.between(start, end).toMillis();//842-33044 System.out.println("耗费时间为 : "+millis); } } class ForkJoinSumCalculate extends RecursiveTask<Long>{ private long start; private long end; //拆分临界值 private static final long THURSHOLD = 10000L; public ForkJoinSumCalculate(long start,long end){ this.start = start; this.end = end; } //The main computation performed by this task. @Override protected Long compute() { long length = end-start; //小于临界值,直接求和 if (length<THURSHOLD){ long sum = 0L; for(long i=start;i<=end;i++){ sum+=i; } return sum; }else{ //否则,进行递归拆分 long middle = (start+end)/2; ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle); left.fork();//进行拆分,同时压入线程队列 ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end); right.fork(); return left.join()+right.join(); } } }
分别使用for循环和java8新特性测试如下:
// for 循环 @Test public void testFor(){ Instant start = Instant.now(); long sum = 0L; // for(long i = 0;i<=1000000000L;i++){ for(long i = 0;i<=50000000000L;i++){ sum+=i; } System.out.println(sum); Instant end = Instant.now(); long millis = Duration.between(start, end).toMillis();//2526--22117 System.out.println("耗费时间为 : "+millis); } // java8新特性--并行流 @Test public void test2(){ Instant start = Instant.now(); // long sum = LongStream.rangeClosed(0L, 1000000000L).parallel().reduce(0L, Long::sum); long sum = LongStream.rangeClosed(0L, 50000000000L).parallel().reduce(0L, Long::sum); System.out.println(sum); Instant end = Instant.now(); long millis = Duration.between(start, end).toMillis();//1699--18909 System.out.println("耗费时间为 : "+millis); }
总结如下:
- 第一求和数据比较小的时候无需考虑哪种方式;
- 第二,求和数据比较大的时候java8新特性是优异于for循环的;
- 第三,求和数据比较大的时候使用ForkJoin需要考虑临界值的设置,否则可能效率不如for循环和java8新特性。
【5】Fork/Join 框架与线程池的区别
① Fork/Join采用“工作窃取”模式(work-stealing)
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
② Fork/Join优势
相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上。
在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能。
③ ForkJoinPool继承图
如下图所示ForkJoinPool是AbstractExecutorService子类实现了ExecutorService接口(其是Executor接口子类)。
【6】ThreadPoolExecutor
前面提到的Executors的几个静态方法的实现,其实就用到了ThreadPoolExecutor,只是根据不同的场景传入了不同的参数。完整的ThreadPoolExecutor总共有七个参数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:当线程数大于corePoolSize时,这是多余的空闲线程在终止前等待新任务的最长时间
unit:keepAliveTime 存活时间的单位
workQueue:工作任务队列,用于保存任务并将其传递给工作线程的队列。
threadFactory:构造线程池中线程的工厂
handler:由于达到线程边界和队列容量而阻止执行时要使用的处理程序
解释如下:
提交任务的时候,判断当前线程池中的存活线程数量是否小于corePoolSize
如果小于corePoolSize,则不管是否有线程处于空闲状态,都会新建一个线程。
如果线程数量已经达到corePoolSize,则将任务扔进队列workQueue
随着任务越来越多,队列可能已经满了,则需要看当前线程是否已经达到了maximumPoolSize,如果没有达到,则创建新的线程,并用它执行该任务。
最坏情况,任务实在太多了,队列已经满了,而线程数量已经达到maximumPoolSize,还有新的任务来,说白了,就是已经满负荷了,任然还有任务需要执行,这个时候就会handler来处理该任务了。
另外三个参数keepAliveTime、unit 和factory 说明如下:
keepAliveTime。如我们所知,使用线程池的目的就是为了减少线程的创建,因为创建线程本身是比较耗资源的。由于线程本身需要占用资源,有一种情况就是,某个时候线程数量比较多,但是任务没有多少,就会出现有的线程没有活干,所以我们就可以考虑释放掉其资源,但是呢,我们又无法预知未来的任务量,所以我们就准许其空闲一段时间,如果过了这段时间都还是空闲的,那么就会释放掉其资源,这个参数就是用指定这段空闲时间的。默认情况下是有超过corePoolSize个线程时,就会用到该值, 但是也可以指定corePoolSize数量之内的线程空闲时是否释放资源(allowCoreThreadTimeout)。
unit 这个参数很好理解,就是单位,就是前面keepAliveTime这个我们准许空闲的时间的单位
factory .其类型为ThreadFactory,顾名思义,就是一个创建Thread的Factory. 该接口只有一个方法,产生一个Thread。通常情况下,我们都只需要使用默认的factory就可以了,但是为了定位问题方便,我们可以为线程池创建的线程设置一个名字,这样看日志的时候就比较方便了。
【7】RejectedExecutionHandler
采用Executors静态方法时,并没有让传入该参数,默认值如下:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
实际上jdk本身提供了四种策略分别是:
- AbortPolicy:会抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
- CallerRunnerPolicy:在调用execute的方法中执行被拒绝的任务
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
- DiscardOldestPolicy:丢掉队列中最老的任务,然后重试
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
- DiscardPolicy:直接丢掉该任务
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
另外还有ThreadPoolExecutor还提供了一些hook方法,如有需要可以使用
- beforeExecute() 任务执行之前调用
- afterExecute() 任务执行之后调用